Skip to content

Commit

Permalink
netsync: handle notfound messages from peers
Browse files Browse the repository at this point in the history
backport from decred/dcrd#2253

When a peer sends a notfound message, remove the hash from requested
map.  Also increase notfound ban score and return early if it
disconnects the peer.
  • Loading branch information
tuxcanfly authored and jcvernaleo committed Jul 28, 2020
1 parent 69773a7 commit 24db7d7
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 6 deletions.
48 changes: 48 additions & 0 deletions netsync/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,13 @@ type headersMsg struct {
peer *peerpkg.Peer
}

// notFoundMsg packages a bitcoin notfound message and the peer it came from
// together so the block handler has access to that information.
type notFoundMsg struct {
notFound *wire.MsgNotFound
peer *peerpkg.Peer
}

// donePeerMsg signifies a newly disconnected peer to the block handler.
type donePeerMsg struct {
peer *peerpkg.Peer
Expand Down Expand Up @@ -1012,6 +1019,32 @@ func (sm *SyncManager) handleHeadersMsg(hmsg *headersMsg) {
}
}

// handleNotFoundMsg handles notfound messages from all peers.
func (sm *SyncManager) handleNotFoundMsg(nfmsg *notFoundMsg) {
peer := nfmsg.peer
state, exists := sm.peerStates[peer]
if !exists {
log.Warnf("Received notfound message from unknown peer %s", peer)
return
}
for _, inv := range nfmsg.notFound.InvList {
// verify the hash was actually announced by the peer
// before deleting from the global requested maps.
switch inv.Type {
case wire.InvTypeBlock:
if _, exists := state.requestedBlocks[inv.Hash]; exists {
delete(state.requestedBlocks, inv.Hash)
delete(sm.requestedBlocks, inv.Hash)
}
case wire.InvTypeTx:
if _, exists := state.requestedTxns[inv.Hash]; exists {
delete(state.requestedTxns, inv.Hash)
delete(sm.requestedTxns, inv.Hash)
}
}
}
}

// haveInventory returns whether or not the inventory represented by the passed
// inventory vector is known. This includes checking all of the various places
// inventory can be when it is in different states such as blocks that are part
Expand Down Expand Up @@ -1293,6 +1326,9 @@ out:
case *headersMsg:
sm.handleHeadersMsg(msg)

case *notFoundMsg:
sm.handleNotFoundMsg(msg)

case *donePeerMsg:
sm.handleDonePeerMsg(msg.peer)

Expand Down Expand Up @@ -1490,6 +1526,18 @@ func (sm *SyncManager) QueueHeaders(headers *wire.MsgHeaders, peer *peerpkg.Peer
sm.msgChan <- &headersMsg{headers: headers, peer: peer}
}

// QueueNotFound adds the passed notfound message and peer to the block handling
// queue.
func (sm *SyncManager) QueueNotFound(notFound *wire.MsgNotFound, peer *peerpkg.Peer) {
// No channel handling here because peers do not need to block on
// reject messages.
if atomic.LoadInt32(&sm.shutdown) != 0 {
return
}

sm.msgChan <- &notFoundMsg{notFound: notFound, peer: peer}
}

// DonePeer informs the blockmanager that a peer has disconnected.
func (sm *SyncManager) DonePeer(peer *peerpkg.Peer) {
// Ignore if we are shutting down.
Expand Down
57 changes: 51 additions & 6 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,14 +364,14 @@ func (sp *serverPeer) pushAddrMsg(addresses []*wire.NetAddress) {
// threshold, a warning is logged including the reason provided. Further, if
// the score is above the ban threshold, the peer will be banned and
// disconnected.
func (sp *serverPeer) addBanScore(persistent, transient uint32, reason string) {
func (sp *serverPeer) addBanScore(persistent, transient uint32, reason string) bool {
// No warning is logged and no score is calculated if banning is disabled.
if cfg.DisableBanning {
return
return false
}
if sp.isWhitelisted {
peerLog.Debugf("Misbehaving whitelisted peer %s: %s", sp, reason)
return
return false
}

warnThreshold := cfg.BanThreshold >> 1
Expand All @@ -383,7 +383,7 @@ func (sp *serverPeer) addBanScore(persistent, transient uint32, reason string) {
peerLog.Warnf("Misbehaving peer %s: %s -- ban score is %d, "+
"it was not increased this time", sp, reason, score)
}
return
return false
}
score := sp.banScore.Increase(persistent, transient)
if score > warnThreshold {
Expand All @@ -394,8 +394,10 @@ func (sp *serverPeer) addBanScore(persistent, transient uint32, reason string) {
sp)
sp.server.BanPeer(sp)
sp.Disconnect()
return true
}
}
return false
}

// hasServices returns whether or not the provided advertised service flags have
Expand Down Expand Up @@ -498,7 +500,9 @@ func (sp *serverPeer) OnMemPool(_ *peer.Peer, msg *wire.MsgMemPool) {
// The ban score accumulates and passes the ban threshold if a burst of
// mempool messages comes from a peer. The score decays each minute to
// half of its value.
sp.addBanScore(0, 33, "mempool")
if sp.addBanScore(0, 33, "mempool") {
return
}

// Generate inventory message with the available transactions in the
// transaction memory pool. Limit it to the max allowed inventory
Expand Down Expand Up @@ -638,7 +642,9 @@ func (sp *serverPeer) OnGetData(_ *peer.Peer, msg *wire.MsgGetData) {
// bursts of small requests are not penalized as that would potentially ban
// peers performing IBD.
// This incremental score decays each minute to half of its value.
sp.addBanScore(0, uint32(length)*99/wire.MaxInvPerMsg, "getdata")
if sp.addBanScore(0, uint32(length)*99/wire.MaxInvPerMsg, "getdata") {
return
}

// We wait on this wait channel periodically to prevent queuing
// far more data than we can send in a reasonable time, wasting memory.
Expand Down Expand Up @@ -1304,6 +1310,44 @@ func (sp *serverPeer) OnWrite(_ *peer.Peer, bytesWritten int, msg wire.Message,
sp.server.AddBytesSent(uint64(bytesWritten))
}

// OnNotFound is invoked when a peer sends a notfound message.
func (sp *serverPeer) OnNotFound(p *peer.Peer, msg *wire.MsgNotFound) {
if !sp.Connected() {
return
}

var numBlocks, numTxns uint32
for _, inv := range msg.InvList {
switch inv.Type {
case wire.InvTypeBlock:
numBlocks++
case wire.InvTypeTx:
numTxns++
default:
peerLog.Debugf("Invalid inv type '%d' in notfound message from %s",
inv.Type, sp)
sp.Disconnect()
return
}
}
if numBlocks > 0 {
blockStr := pickNoun(uint64(numBlocks), "block", "blocks")
reason := fmt.Sprintf("%d %v not found", numBlocks, blockStr)
if sp.addBanScore(20*numBlocks, 0, reason) {
return
}
}
if numTxns > 0 {
txStr := pickNoun(uint64(numTxns), "transaction", "transactions")
reason := fmt.Sprintf("%d %v not found", numBlocks, txStr)
if sp.addBanScore(0, 10*numTxns, reason) {
return
}
}

sp.server.syncManager.QueueNotFound(msg, p)
}

// randomUint16Number returns a random uint16 in a specified input range. Note
// that the range is in zeroth ordering; if you pass it 1800, you will get
// values from 0 to 1800.
Expand Down Expand Up @@ -1998,6 +2042,7 @@ func newPeerConfig(sp *serverPeer) *peer.Config {
OnAddr: sp.OnAddr,
OnRead: sp.OnRead,
OnWrite: sp.OnWrite,
OnNotFound: sp.OnNotFound,

// Note: The reference client currently bans peers that send alerts
// not signed with its key. We could verify against their key, but
Expand Down

0 comments on commit 24db7d7

Please sign in to comment.