Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: move net_processing implementation details out of header #20811

Merged
merged 5 commits into from
Jan 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1410,8 +1410,8 @@ bool AppInitMain(const util::Ref& context, NodeContext& node, interfaces::BlockA
ChainstateManager& chainman = *Assert(node.chainman);

assert(!node.peerman);
node.peerman = std::make_unique<PeerManager>(chainparams, *node.connman, node.banman.get(),
*node.scheduler, chainman, *node.mempool, ignores_incoming_txs);
node.peerman = PeerManager::make(chainparams, *node.connman, node.banman.get(),
*node.scheduler, chainman, *node.mempool, ignores_incoming_txs);
RegisterValidationInterface(node.peerman.get());

// sanitize comments per BIP-0014, format user agent and check total size
Expand Down
23 changes: 21 additions & 2 deletions src/net.h
Original file line number Diff line number Diff line change
Expand Up @@ -769,11 +769,30 @@ class CNode
class NetEventsInterface
{
public:
virtual bool ProcessMessages(CNode* pnode, std::atomic<bool>& interrupt) = 0;
virtual bool SendMessages(CNode* pnode) EXCLUSIVE_LOCKS_REQUIRED(pnode->cs_sendProcessing) = 0;
/** Initialize a peer (setup state, queue any initial messages) */
Copy link
Contributor

@jnewbery jnewbery Jan 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's a big deal, but if you're going to add these comments to the interface class, I think there are some potential improvements:

  • "queue any initial messages" is only true for the version message for outbound peers. Since it's not universally true, it probably doesn't belong in the description of the interface.
  • SendMessages() return value is not used to indicate if more work is done. The function always returns true and that value is discarded by the caller. A future commit could change the function to return void.
  • ProcessMessages() does return a bool that indicates whether there's more work.

Eventually, these functions could all be updated to just take a single CNode& argument, which would be a very clean interface (#20228 removes the update_connection_time arg from FinalizeNode(), and I have another branch that eliminates interrupt).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SendMessages lost its new lock annotation in the rebase, so I've added the doc from ProcessMessages return value.

I think "any initial messages" covers nothing, just a version message, or anything else we might think up fine. The return value of SendMessages is "used" in the unit tests, so haven't removed that in this PR. Looking at the CNode& thing in #20758

virtual void InitializeNode(CNode* pnode) = 0;

/** Handle removal of a peer (clear state) */
virtual void FinalizeNode(const CNode& node, bool& update_connection_time) = 0;

/**
* Process protocol messages received from a given node
*
* @param[in] pnode The node which we have received messages from.
* @param[in] interrupt Interrupt condition for processing threads
* @return True if there is more work to be done
*/
virtual bool ProcessMessages(CNode* pnode, std::atomic<bool>& interrupt) = 0;

/**
* Send queued protocol messages to a given node.
*
* @param[in] pnode The node which we are sending messages to.
* @return True if there is more work to be done
*/
virtual bool SendMessages(CNode* pnode) EXCLUSIVE_LOCKS_REQUIRED(pnode->cs_sendProcessing) = 0;


protected:
/**
* Protected destructor so that instances can only be deleted by derived classes.
Expand Down
264 changes: 229 additions & 35 deletions src/net_processing.cpp

Large diffs are not rendered by default.

221 changes: 19 additions & 202 deletions src/net_processing.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,13 @@
#ifndef BITCOIN_NET_PROCESSING_H
#define BITCOIN_NET_PROCESSING_H

#include <consensus/params.h>
#include <net.h>
#include <sync.h>
#include <txrequest.h>
#include <validationinterface.h>

class BlockTransactionsRequest;
class BlockValidationState;
class CBlockHeader;
class CChainParams;
class CTxMemPool;
class ChainstateManager;
class TxValidationState;

extern RecursiveMutex cs_main;
extern RecursiveMutex g_cs_orphans;
jnewbery marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -39,216 +33,39 @@ struct CNodeStateStats {
std::vector<int> vHeightInFlight;
};

/**
* Data structure for an individual peer. This struct is not protected by
* cs_main since it does not contain validation-critical data.
*
* Memory is owned by shared pointers and this object is destructed when
* the refcount drops to zero.
*
* Mutexes inside this struct must not be held when locking m_peer_mutex.
*
* TODO: move most members from CNodeState to this structure.
* TODO: move remaining application-layer data members from CNode to this structure.
*/
struct Peer {
/** Same id as the CNode object for this peer */
const NodeId m_id{0};

/** Protects misbehavior data members */
Mutex m_misbehavior_mutex;
/** Accumulated misbehavior score for this peer */
int m_misbehavior_score GUARDED_BY(m_misbehavior_mutex){0};
/** Whether this peer should be disconnected and marked as discouraged (unless it has the noban permission). */
bool m_should_discourage GUARDED_BY(m_misbehavior_mutex){false};

/** Protects block inventory data members */
Mutex m_block_inv_mutex;
/** List of blocks that we'll announce via an `inv` message.
* There is no final sorting before sending, as they are always sent
* immediately and in the order requested. */
std::vector<uint256> m_blocks_for_inv_relay GUARDED_BY(m_block_inv_mutex);
/** Unfiltered list of blocks that we'd like to announce via a `headers`
* message. If we can't announce via a `headers` message, we'll fall back to
* announcing via `inv`. */
std::vector<uint256> m_blocks_for_headers_relay GUARDED_BY(m_block_inv_mutex);
/** The final block hash that we sent in an `inv` message to this peer.
* When the peer requests this block, we send an `inv` message to trigger
* the peer to request the next sequence of block hashes.
* Most peers use headers-first syncing, which doesn't use this mechanism */
uint256 m_continuation_block GUARDED_BY(m_block_inv_mutex) {};

/** This peer's reported block height when we connected */
std::atomic<int> m_starting_height{-1};

/** Set of txids to reconsider once their parent transactions have been accepted **/
std::set<uint256> m_orphan_work_set GUARDED_BY(g_cs_orphans);

/** Protects m_getdata_requests **/
Mutex m_getdata_requests_mutex;
/** Work queue of items requested by this peer **/
std::deque<CInv> m_getdata_requests GUARDED_BY(m_getdata_requests_mutex);

explicit Peer(NodeId id) : m_id(id) {}
};

using PeerRef = std::shared_ptr<Peer>;

class PeerManager final : public CValidationInterface, public NetEventsInterface {
class PeerManager : public CValidationInterface, public NetEventsInterface
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe consider renaming
PeerManager -> PeerManagerInterface
PeerManagerImpl -> PeerManager
for consistency with CValidationInterface and NetEventsInterface.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way I look at it is that Interface is for defining a class where some other module is going to implement the methods that do the actual work; but the work to fulfill the PeerManager API really is done by net_processing; it's just an implementation detail that the methods are virtual and inheritance is used -- the unique_ptr<Impl> m_impl; approach used in txrequest could equally well have been used too. So I think PeerManager* is the right name for what is being exposed to other modules here.

{
public:
PeerManager(const CChainParams& chainparams, CConnman& connman, BanMan* banman,
CScheduler& scheduler, ChainstateManager& chainman, CTxMemPool& pool,
bool ignore_incoming_txs);
static std::unique_ptr<PeerManager> make(const CChainParams& chainparams, CConnman& connman, BanMan* banman,
CScheduler& scheduler, ChainstateManager& chainman, CTxMemPool& pool,
bool ignore_incoming_txs);
virtual ~PeerManager() { }

/**
* Overridden from CValidationInterface.
*/
void BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexConnected) override;
void BlockDisconnected(const std::shared_ptr<const CBlock> &block, const CBlockIndex* pindex) override;
/**
* Overridden from CValidationInterface.
*/
void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override;
/**
* Overridden from CValidationInterface.
*/
void BlockChecked(const CBlock& block, const BlockValidationState& state) override;
/**
* Overridden from CValidationInterface.
*/
void NewPoWValidBlock(const CBlockIndex *pindex, const std::shared_ptr<const CBlock>& pblock) override;

/** Initialize a peer by adding it to mapNodeState and pushing a message requesting its version */
void InitializeNode(CNode* pnode) override;
/** Handle removal of a peer by updating various state and removing it from mapNodeState */
void FinalizeNode(const CNode& node, bool& fUpdateConnectionTime) override;
/**
* Process protocol messages received from a given node
ajtowns marked this conversation as resolved.
Show resolved Hide resolved
*
* @param[in] pfrom The node which we have received messages from.
* @param[in] interrupt Interrupt condition for processing threads
*/
bool ProcessMessages(CNode* pfrom, std::atomic<bool>& interrupt) override;
/**
* Send queued protocol messages to be sent to a give node.
*
* @param[in] pto The node which we are sending messages to.
* @return True if there is more work to be done
*/
bool SendMessages(CNode* pto) override EXCLUSIVE_LOCKS_REQUIRED(pto->cs_sendProcessing);
/** Get statistics from node state */
virtual bool GetNodeStateStats(NodeId nodeid, CNodeStateStats& stats) = 0;

/** Consider evicting an outbound peer based on the amount of time they've been behind our tip */
void ConsiderEviction(CNode& pto, int64_t time_in_seconds) EXCLUSIVE_LOCKS_REQUIRED(cs_main);
/** Evict extra outbound peers. If we think our tip may be stale, connect to an extra outbound */
void CheckForStaleTipAndEvictPeers();
/** If we have extra outbound peers, try to disconnect the one with the oldest block announcement */
void EvictExtraOutboundPeers(int64_t time_in_seconds) EXCLUSIVE_LOCKS_REQUIRED(cs_main);
/** Retrieve unbroadcast transactions from the mempool and reattempt sending to peers */
void ReattemptInitialBroadcast(CScheduler& scheduler) const;
/** Whether this node ignores txs received over p2p. */
virtual bool IgnoresIncomingTxs() = 0;

/** Process a single message from a peer. Public for fuzz testing */
void ProcessMessage(CNode& pfrom, const std::string& msg_type, CDataStream& vRecv,
const std::chrono::microseconds time_received, const std::atomic<bool>& interruptMsgProc);
/** Set the best height */
virtual void SetBestHeight(int height) = 0;

/**
* Increment peer's misbehavior score. If the new value >= DISCOURAGEMENT_THRESHOLD, mark the node
* to be discouraged, meaning the peer might be disconnected and added to the discouragement filter.
* Public for unit testing.
*/
void Misbehaving(const NodeId pnode, const int howmuch, const std::string& message);

/** Get statistics from node state */
bool GetNodeStateStats(NodeId nodeid, CNodeStateStats& stats);

/** Set the best height */
void SetBestHeight(int height) { m_best_height = height; };

/** Whether this node ignores txs received over p2p. */
bool IgnoresIncomingTxs() { return m_ignore_incoming_txs; };

private:
/** Get a shared pointer to the Peer object.
* May return an empty shared_ptr if the Peer object can't be found. */
PeerRef GetPeerRef(NodeId id) const;

/** Get a shared pointer to the Peer object and remove it from m_peer_map.
* May return an empty shared_ptr if the Peer object can't be found. */
PeerRef RemovePeer(NodeId id);

/**
* Potentially mark a node discouraged based on the contents of a BlockValidationState object
*
* @param[in] via_compact_block this bool is passed in because net_processing should
* punish peers differently depending on whether the data was provided in a compact
* block message or not. If the compact block had a valid header, but contained invalid
* txs, the peer should not be punished. See BIP 152.
*
* @return Returns true if the peer was punished (probably disconnected)
*/
bool MaybePunishNodeForBlock(NodeId nodeid, const BlockValidationState& state,
bool via_compact_block, const std::string& message = "");
virtual void Misbehaving(const NodeId pnode, const int howmuch, const std::string& message) = 0;

/**
* Potentially disconnect and discourage a node based on the contents of a TxValidationState object
*
* @return Returns true if the peer was punished (probably disconnected)
*/
bool MaybePunishNodeForTx(NodeId nodeid, const TxValidationState& state, const std::string& message = "");

/** Maybe disconnect a peer and discourage future connections from its address.
*
* @param[in] pnode The node to check.
* @return True if the peer was marked for disconnection in this function
* Evict extra outbound peers. If we think our tip may be stale, connect to an extra outbound.
* Public for unit testing.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could have been better to comment in commit message the re-ordering logic followed "Move method exposed for test-only at the end of public method space declaration" :) ?

*/
bool MaybeDiscourageAndDisconnect(CNode& pnode);
virtual void CheckForStaleTipAndEvictPeers() = 0;

void ProcessOrphanTx(std::set<uint256>& orphan_work_set) EXCLUSIVE_LOCKS_REQUIRED(cs_main, g_cs_orphans);
/** Process a single headers message from a peer. */
void ProcessHeadersMessage(CNode& pfrom, const Peer& peer,
const std::vector<CBlockHeader>& headers,
bool via_compact_block);

void SendBlockTransactions(CNode& pfrom, const CBlock& block, const BlockTransactionsRequest& req);

/** Register with TxRequestTracker that an INV has been received from a
* peer. The announcement parameters are decided in PeerManager and then
* passed to TxRequestTracker. */
void AddTxAnnouncement(const CNode& node, const GenTxid& gtxid, std::chrono::microseconds current_time)
EXCLUSIVE_LOCKS_REQUIRED(::cs_main);

/** Send a version message to a peer */
void PushNodeVersion(CNode& pnode, int64_t nTime);

const CChainParams& m_chainparams;
CConnman& m_connman;
/** Pointer to this node's banman. May be nullptr - check existence before dereferencing. */
BanMan* const m_banman;
ChainstateManager& m_chainman;
CTxMemPool& m_mempool;
TxRequestTracker m_txrequest GUARDED_BY(::cs_main);

/** The height of the best chain */
std::atomic<int> m_best_height{-1};

int64_t m_stale_tip_check_time; //!< Next time to check for stale tip

/** Whether this node is running in blocks only mode */
const bool m_ignore_incoming_txs;

/** Whether we've completed initial sync yet, for determining when to turn
* on extra block-relay-only peers. */
bool m_initial_sync_finished{false};

/** Protects m_peer_map. This mutex must not be locked while holding a lock
* on any of the mutexes inside a Peer object. */
mutable Mutex m_peer_mutex;
/**
* Map of all Peer objects, keyed by peer id. This map is protected
* by the m_peer_mutex. Once a shared pointer reference is
* taken, the lock may be released. Individual fields are protected by
* their own locks.
*/
std::map<NodeId, PeerRef> m_peer_map GUARDED_BY(m_peer_mutex);
/** Process a single message from a peer. Public for fuzz testing */
virtual void ProcessMessage(CNode& pfrom, const std::string& msg_type, CDataStream& vRecv,
const std::chrono::microseconds time_received, const std::atomic<bool>& interruptMsgProc) = 0;
};

/** Relay transaction to every node */
Expand Down
16 changes: 8 additions & 8 deletions src/test/denialofservice_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ BOOST_AUTO_TEST_CASE(outbound_slow_chain_eviction)
{
const CChainParams& chainparams = Params();
auto connman = MakeUnique<CConnman>(0x1337, 0x1337);
auto peerLogic = std::make_unique<PeerManager>(chainparams, *connman, nullptr, *m_node.scheduler,
*m_node.chainman, *m_node.mempool, false);
auto peerLogic = PeerManager::make(chainparams, *connman, nullptr, *m_node.scheduler,
*m_node.chainman, *m_node.mempool, false);

// Mock an outbound peer
CAddress addr1(ip(0xa0b0c001), NODE_NONE);
Expand Down Expand Up @@ -150,8 +150,8 @@ BOOST_AUTO_TEST_CASE(stale_tip_peer_management)
{
const CChainParams& chainparams = Params();
auto connman = MakeUnique<CConnmanTest>(0x1337, 0x1337);
auto peerLogic = std::make_unique<PeerManager>(chainparams, *connman, nullptr, *m_node.scheduler,
*m_node.chainman, *m_node.mempool, false);
auto peerLogic = PeerManager::make(chainparams, *connman, nullptr, *m_node.scheduler,
*m_node.chainman, *m_node.mempool, false);

constexpr int max_outbound_full_relay = MAX_OUTBOUND_FULL_RELAY_CONNECTIONS;
CConnman::Options options;
Expand Down Expand Up @@ -224,8 +224,8 @@ BOOST_AUTO_TEST_CASE(peer_discouragement)
const CChainParams& chainparams = Params();
auto banman = MakeUnique<BanMan>(GetDataDir() / "banlist.dat", nullptr, DEFAULT_MISBEHAVING_BANTIME);
auto connman = MakeUnique<CConnman>(0x1337, 0x1337);
auto peerLogic = std::make_unique<PeerManager>(chainparams, *connman, banman.get(), *m_node.scheduler,
*m_node.chainman, *m_node.mempool, false);
auto peerLogic = PeerManager::make(chainparams, *connman, banman.get(), *m_node.scheduler,
*m_node.chainman, *m_node.mempool, false);

banman->ClearBanned();
CAddress addr1(ip(0xa0b0c001), NODE_NONE);
Expand Down Expand Up @@ -271,8 +271,8 @@ BOOST_AUTO_TEST_CASE(DoS_bantime)
const CChainParams& chainparams = Params();
auto banman = MakeUnique<BanMan>(GetDataDir() / "banlist.dat", nullptr, DEFAULT_MISBEHAVING_BANTIME);
auto connman = MakeUnique<CConnman>(0x1337, 0x1337);
auto peerLogic = std::make_unique<PeerManager>(chainparams, *connman, banman.get(), *m_node.scheduler,
*m_node.chainman, *m_node.mempool, false);
auto peerLogic = PeerManager::make(chainparams, *connman, banman.get(), *m_node.scheduler,
*m_node.chainman, *m_node.mempool, false);

banman->ClearBanned();
int64_t nStartTime = GetTime();
Expand Down
6 changes: 3 additions & 3 deletions src/test/util/setup_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,9 @@ TestingSetup::TestingSetup(const std::string& chainName, const std::vector<const

m_node.banman = MakeUnique<BanMan>(GetDataDir() / "banlist.dat", nullptr, DEFAULT_MISBEHAVING_BANTIME);
m_node.connman = MakeUnique<CConnman>(0x1337, 0x1337); // Deterministic randomness for tests.
m_node.peerman = std::make_unique<PeerManager>(chainparams, *m_node.connman, m_node.banman.get(),
*m_node.scheduler, *m_node.chainman, *m_node.mempool,
false);
m_node.peerman = PeerManager::make(chainparams, *m_node.connman, m_node.banman.get(),
*m_node.scheduler, *m_node.chainman, *m_node.mempool,
false);
{
CConnman::Options options;
options.m_msgproc = m_node.peerman.get();
Expand Down
2 changes: 1 addition & 1 deletion test/sanitizer_suppressions/tsan
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ mutex:CConnman::ThreadOpenConnections
mutex:CConnman::ThreadOpenAddedConnections
mutex:CConnman::SocketHandler
mutex:UpdateTip
mutex:PeerManager::UpdatedBlockTip
mutex:PeerManagerImpl::UpdatedBlockTip
mutex:g_best_block_mutex

# race (TODO fix)
Expand Down