Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

peer, main, netsync, blockchain: parallel block downloads #2226

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 43 additions & 14 deletions blockchain/accept.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package blockchain

import (
"fmt"
"sync"

"github.com/btcsuite/btcd/btcutil"
"github.com/btcsuite/btcd/database"
Expand Down Expand Up @@ -44,20 +45,36 @@ func (b *BlockChain) maybeAcceptBlock(block *btcutil.Block, flags BehaviorFlags)
return false, err
}

// Insert the block into the database if it's not already there. Even
// though it is possible the block will ultimately fail to connect, it
// has already passed all proof-of-work and validity tests which means
// it would be prohibitively expensive for an attacker to fill up the
// disk with a bunch of blocks that fail to connect. This is necessary
// since it allows block download to be decoupled from the much more
// expensive connection logic. It also has some other nice properties
// such as making blocks that never become part of the main chain or
// blocks that fail to connect available for further analysis.
err = b.db.Update(func(dbTx database.Tx) error {
return dbStoreBlock(dbTx, block)
})
if err != nil {
return false, err
// Store the block in parallel if we're in headers first mode. The
// headers were already checked and this block is under the checkpoint
// so it's safe to just add it to the database while the block
// validation is happening.
var wg sync.WaitGroup
var dbStoreError error
if flags&BFFastAdd == BFFastAdd {
go func() {
wg.Add(1)
defer wg.Done()
// Insert the block into the database if it's not already there. Even
// though it is possible the block will ultimately fail to connect, it
// has already passed all proof-of-work and validity tests which means
// it would be prohibitively expensive for an attacker to fill up the
// disk with a bunch of blocks that fail to connect. This is necessary
// since it allows block download to be decoupled from the much more
// expensive connection logic. It also has some other nice properties
// such as making blocks that never become part of the main chain or
// blocks that fail to connect available for further analysis.
dbStoreError = b.db.Update(func(dbTx database.Tx) error {
return dbTx.StoreBlock(block)
})
}()
} else {
err = b.db.Update(func(dbTx database.Tx) error {
return dbStoreBlock(dbTx, block)
})
if err != nil {
return false, err
}
}

// Create a new block node for the block and add it to the node index. Even
Expand Down Expand Up @@ -90,5 +107,17 @@ func (b *BlockChain) maybeAcceptBlock(block *btcutil.Block, flags BehaviorFlags)
b.sendNotification(NTBlockAccepted, block)
}()

// Wait until the block is saved. If there was a db error, then unset
// the data stored flag and flush the block index.
wg.Wait()
if dbStoreError != nil {
b.index.UnsetStatusFlags(newNode, statusDataStored)
err = b.index.flushToDB()
if err != nil {
return false, fmt.Errorf("%v. %v", err, dbStoreError)
}
return false, dbStoreError
}

return isMainChain, nil
}
16 changes: 10 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,26 @@ require (
github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792
github.com/btcsuite/winsvc v1.0.0
github.com/davecgh/go-spew v1.1.1
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1
github.com/decred/dcrd/lru v1.0.0
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0
github.com/decred/dcrd/lru v1.1.2
github.com/gorilla/websocket v1.5.0
github.com/jessevdk/go-flags v1.4.0
github.com/jrick/logrotate v1.0.0
github.com/stretchr/testify v1.8.4
github.com/lightninglabs/neutrino v0.16.0
github.com/stretchr/testify v1.9.0
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7
golang.org/x/crypto v0.22.0
golang.org/x/sys v0.19.0
)

require (
github.com/aead/siphash v1.0.1 // indirect
github.com/decred/dcrd/crypto/blake256 v1.0.0 // indirect
github.com/decred/dcrd/crypto/blake256 v1.0.1 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23 // indirect
github.com/kkdai/bstream v1.0.0 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/stretchr/objx v0.5.0 // indirect
github.com/stretchr/objx v0.5.2 // indirect
golang.org/x/net v0.24.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Expand Down Expand Up @@ -64,3 +66,5 @@ retract (
)

go 1.17

replace github.com/lightninglabs/neutrino => /home/calvin/bitcoin-projects/btcd/neutrino
65 changes: 58 additions & 7 deletions go.sum

Large diffs are not rendered by default.

34 changes: 19 additions & 15 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/btcsuite/btcd/netsync"
"github.com/btcsuite/btcd/peer"
"github.com/btcsuite/btcd/txscript"
"github.com/lightninglabs/neutrino/query"

"github.com/btcsuite/btclog"
"github.com/jrick/logrotate/rotator"
Expand Down Expand Up @@ -54,21 +55,22 @@ var (
// application shutdown.
logRotator *rotator.Rotator

adxrLog = backendLog.Logger("ADXR")
amgrLog = backendLog.Logger("AMGR")
cmgrLog = backendLog.Logger("CMGR")
bcdbLog = backendLog.Logger("BCDB")
btcdLog = backendLog.Logger("BTCD")
chanLog = backendLog.Logger("CHAN")
discLog = backendLog.Logger("DISC")
indxLog = backendLog.Logger("INDX")
minrLog = backendLog.Logger("MINR")
peerLog = backendLog.Logger("PEER")
rpcsLog = backendLog.Logger("RPCS")
scrpLog = backendLog.Logger("SCRP")
srvrLog = backendLog.Logger("SRVR")
syncLog = backendLog.Logger("SYNC")
txmpLog = backendLog.Logger("TXMP")
adxrLog = backendLog.Logger("ADXR")
amgrLog = backendLog.Logger("AMGR")
cmgrLog = backendLog.Logger("CMGR")
bcdbLog = backendLog.Logger("BCDB")
btcdLog = backendLog.Logger("BTCD")
chanLog = backendLog.Logger("CHAN")
discLog = backendLog.Logger("DISC")
indxLog = backendLog.Logger("INDX")
minrLog = backendLog.Logger("MINR")
peerLog = backendLog.Logger("PEER")
rpcsLog = backendLog.Logger("RPCS")
scrpLog = backendLog.Logger("SCRP")
srvrLog = backendLog.Logger("SRVR")
syncLog = backendLog.Logger("SYNC")
txmpLog = backendLog.Logger("TXMP")
queryLog = backendLog.Logger("QURY")
)

// Initialize package-global logger variables.
Expand All @@ -84,6 +86,7 @@ func init() {
txscript.UseLogger(scrpLog)
netsync.UseLogger(syncLog)
mempool.UseLogger(txmpLog)
query.UseLogger(queryLog)
}

// subsystemLoggers maps each subsystem identifier to its associated logger.
Expand All @@ -103,6 +106,7 @@ var subsystemLoggers = map[string]btclog.Logger{
"SRVR": srvrLog,
"SYNC": syncLog,
"TXMP": txmpLog,
"QURY": queryLog,
}

// initLogRotator initializes the logging rotater to write logs to logFile and
Expand Down
75 changes: 75 additions & 0 deletions netsync/blocklogger.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package netsync

import (
"fmt"
"sort"
"sync"
"time"

Expand Down Expand Up @@ -82,3 +83,77 @@ func (b *blockProgressLogger) LogBlockHeight(block *btcutil.Block, chain *blockc
func (b *blockProgressLogger) SetLastLogTime(time time.Time) {
b.lastBlockLogTime = time
}

// peerLogger logs the progress of blocks downloaded from different peers during
// headers-first download.
type peerLogger struct {
lastPeerLogTime time.Time
peers map[string]int

subsystemLogger btclog.Logger
sync.Mutex
}

// newPeerLogger returns a new peerLogger with fields initialized.
func newPeerLogger(logger btclog.Logger) *peerLogger {
return &peerLogger{
lastPeerLogTime: time.Now(),
subsystemLogger: logger,
peers: make(map[string]int),
}
}

// LogPeers logs how many blocks have been received from which peers in the last
// 10 seconds.
func (p *peerLogger) LogPeers(peer string) {
p.Lock()
defer p.Unlock()

count, found := p.peers[peer]
if found {
count++
p.peers[peer] = count
} else {
p.peers[peer] = 1
}

now := time.Now()
duration := now.Sub(p.lastPeerLogTime)
if duration < time.Second*10 {
return
}
// Truncate the duration to 10s of milliseconds.
durationMillis := int64(duration / time.Millisecond)
tDuration := 10 * time.Millisecond * time.Duration(durationMillis/10)

type peerInfo struct {
name string
count int
}

// Sort by blocks downloaded before printing.
var sortedPeers []peerInfo
for k, v := range p.peers {
sortedPeers = append(sortedPeers, peerInfo{k, v})
}
sort.Slice(sortedPeers, func(i, j int) bool {
return sortedPeers[i].count > sortedPeers[j].count
})

totalBlocks := 0
peerDownloadStr := ""
for _, sortedPeer := range sortedPeers {
peerDownloadStr += fmt.Sprintf("%d blocks from %v, ",
sortedPeer.count, sortedPeer.name)
totalBlocks += sortedPeer.count
}

p.subsystemLogger.Infof("Peer download stats in the last %s. total: %v, %s",
tDuration, totalBlocks, peerDownloadStr)

// Reset fields.
p.lastPeerLogTime = now
for k := range p.peers {
delete(p.peers, k)
}
}
3 changes: 3 additions & 0 deletions netsync/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,7 @@ type Config struct {
MaxPeers int

FeeEstimator *mempool.FeeEstimator

// ConnectedPeers returns all the currently connected peers.
ConnectedPeers func() []*peer.Peer
}
Loading
Loading