Skip to content

Commit

Permalink
refactor(p2p)!: Refactor PeerSet to eliminate data races & improve pe…
Browse files Browse the repository at this point in the history
…rformance (cometbft#2246)

Original PR: cometbft#2159

## Breaking changes

- `[p2p]` Rename `IPeerSet#List` to `Copy`, add `Random`, `ForEach`
methods.
   Rename `PeerSet#List` to `Copy`, add `Random`, `ForEach` methods.

Fixes cometbft#2158

## Performance improvement

This change makes PeerSet.Remove much more efficient simply by using
more idiomatic Go re-slicing to avoid the prior mechanisms of creating
fresh peer set lists on just a single remove. While here also added a
remedy for a found bug cometbft#2158 due to an abstraction that returns a stale
slice to its caller in Switch.OnStop.

Benchmark results:

```shell
$ benchstat before.txt after.txt
name                 old time/op    new time/op    delta
PeerSetRemoveOne-8     90.5µs ± 4%    95.9µs ±13%     ~     (p=0.218 n=10+10)
PeerSetRemoveMany-8    1.58ms ± 4%    1.50ms ± 1%   -4.98%  (p=0.000 n=10+8)

name                 old alloc/op   new alloc/op   delta
PeerSetRemoveOne-8     8.48kB ± 0%    7.92kB ± 0%   -6.60%  (p=0.000 n=10+10)
PeerSetRemoveMany-8     149kB ± 0%      65kB ± 0%  -56.44%  (p=0.000 n=10+10)

name                 old allocs/op  new allocs/op  delta
PeerSetRemoveOne-8       85.0 ± 0%      73.0 ± 0%  -14.12%  (p=0.000 n=10+10)
PeerSetRemoveMany-8     1.32k ± 0%     1.22k ± 0%   -7.51%  (p=0.000 n=10+10)
```

which savings become so much more when peers are removed much more
frequently for a longer period of time and could mitigate DOS vectors.

Fixes cometbft#2157

---------

Co-authored-by: Emmanuel T Odeke <emmanuel@orijtech.com>
Co-authored-by: Sergio Mena <sergio@informal.systems>
  • Loading branch information
3 people authored Feb 12, 2024
1 parent 460f58c commit e1ee71c
Show file tree
Hide file tree
Showing 18 changed files with 192 additions and 143 deletions.
3 changes: 3 additions & 0 deletions .changelog/unreleased/breaking-changes/2246-p2p-peerset.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
- `[p2p]` Rename `IPeerSet#List` to `Copy`, add `Random`, `ForEach` methods.
Rename `PeerSet#List` to `Copy`, add `Random`, `ForEach` methods.
([\#2246](https://github.com/cometbft/cometbft/pull/2246))
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
- `[p2p]` make `PeerSet.Remove` more efficient (Author: @odeke-em) [\#2246](https://github.com/cometbft/cometbft/pull/2246)
6 changes: 3 additions & 3 deletions internal/consensus/byzantine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
require.NoError(t, err)
prevote2, err := bcs.signVote(types.PrevoteType, nil, types.PartSetHeader{}, nil)
require.NoError(t, err)
peerList := reactors[byzantineNode].Switch.Peers().List()
peerList := reactors[byzantineNode].Switch.Peers().Copy()
bcs.Logger.Info("Getting peer list", "peers", peerList)
// send two votes to all peers (1st to one half, 2nd to another half)
for i, peer := range peerList {
Expand Down Expand Up @@ -403,7 +403,7 @@ func TestByzantineConflictingProposalsWithPartition(t *testing.T) {
// byz proposer sends one block to peers[0]
// and the other block to peers[1] and peers[2].
// note peers and switches order don't match.
peers := switches[0].Peers().List()
peers := switches[0].Peers().Copy()

// partition A
ind0 := getSwitchIndex(switches, peers[0])
Expand Down Expand Up @@ -492,7 +492,7 @@ func byzantineDecideProposalFunc(ctx context.Context, t *testing.T, height int64
block2Hash := block2.Hash()

// broadcast conflicting proposals/block parts to peers
peers := sw.Peers().List()
peers := sw.Peers().Copy()
t.Logf("Byzantine: broadcasting conflicting proposals to %d peers", len(peers))
for i, peer := range peers {
if i < len(peers)/2 {
Expand Down
2 changes: 1 addition & 1 deletion internal/consensus/invalid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func invalidDoPrevoteFunc(t *testing.T, cs *State, sw *p2p.Switch, pv types.Priv
precommit.ExtensionSignature = p.ExtensionSignature
cs.privValidator = nil // disable priv val so we don't do normal votes

peers := sw.Peers().List()
peers := sw.Peers().Copy()
for _, peer := range peers {
cs.Logger.Info("Sending bad vote", "block", blockHash, "peer", peer)
peer.Send(p2p.Envelope{
Expand Down
6 changes: 3 additions & 3 deletions internal/consensus/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ func (conR *Reactor) broadcastHasVoteMessage(vote *types.Vote) {
})
/*
// TODO: Make this broadcast more selective.
for _, peer := range conR.Switch.Peers().List() {
for _, peer := range conR.Switch.Peers().Copy() {
ps, ok := peer.Get(PeerStateKey).(*PeerState)
if !ok {
panic(fmt.Sprintf("Peer %v has no state", peer))
Expand Down Expand Up @@ -1039,13 +1039,13 @@ func (conR *Reactor) String() string {
func (conR *Reactor) StringIndented(indent string) string {
s := "ConsensusReactor{\n"
s += indent + " " + conR.conS.StringIndented(indent+" ") + "\n"
for _, peer := range conR.Switch.Peers().List() {
conR.Switch.Peers().ForEach(func(peer p2p.Peer) {
ps, ok := peer.Get(types.PeerStateKey).(*PeerState)
if !ok {
panic(fmt.Sprintf("Peer %v has no state", peer))
}
s += indent + " " + ps.StringIndented(indent+" ") + "\n"
}
})
s += indent + "}"
return s
}
Expand Down
2 changes: 1 addition & 1 deletion internal/consensus/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ func TestReactorRecordsVotesAndBlockParts(t *testing.T) {
})

// Get peer
peer := reactors[1].Switch.Peers().List()[0]
peer := reactors[1].Switch.Peers().Copy()[0]
// Get peer state
ps := peer.Get(types.PeerStateKey).(*PeerState)

Expand Down
14 changes: 7 additions & 7 deletions internal/evidence/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestReactorBroadcastEvidence(t *testing.T) {

// set the peer height on each reactor
for _, r := range reactors {
for _, peer := range r.Switch.Peers().List() {
for _, peer := range r.Switch.Peers().Copy() {
ps := peerState{height}
peer.Set(types.PeerStateKey, ps)
}
Expand Down Expand Up @@ -85,14 +85,14 @@ func TestReactorSelectiveBroadcast(t *testing.T) {

// set the peer height on each reactor
for _, r := range reactors {
for _, peer := range r.Switch.Peers().List() {
for _, peer := range r.Switch.Peers().Copy() {
ps := peerState{height1}
peer.Set(types.PeerStateKey, ps)
}
}

// update the first reactor peer's height to be very small
peer := reactors[0].Switch.Peers().List()[0]
peer := reactors[0].Switch.Peers().Copy()[0]
ps := peerState{height2}
peer.Set(types.PeerStateKey, ps)

Expand All @@ -103,7 +103,7 @@ func TestReactorSelectiveBroadcast(t *testing.T) {
waitForEvidence(t, evList[:numEvidence/2-1], []*evidence.Pool{pools[1]})

// peers should still be connected
peers := reactors[1].Switch.Peers().List()
peers := reactors[1].Switch.Peers().Copy()
assert.Len(t, peers, 1)
}

Expand Down Expand Up @@ -134,11 +134,11 @@ func TestReactorsGossipNoCommittedEvidence(t *testing.T) {

time.Sleep(100 * time.Millisecond)

peer := reactors[0].Switch.Peers().List()[0]
peer := reactors[0].Switch.Peers().Copy()[0]
ps := peerState{height - 2}
peer.Set(types.PeerStateKey, ps)

peer = reactors[1].Switch.Peers().List()[0]
peer = reactors[1].Switch.Peers().Copy()[0]
ps = peerState{height}
peer.Set(types.PeerStateKey, ps)

Expand Down Expand Up @@ -176,7 +176,7 @@ func TestReactorsGossipNoCommittedEvidence(t *testing.T) {

// now update the state of the second reactor
pools[1].Update(state, types.EvidenceList{})
peer = reactors[0].Switch.Peers().List()[0]
peer = reactors[0].Switch.Peers().Copy()[0]
ps = peerState{height}
peer.Set(types.PeerStateKey, ps)

Expand Down
28 changes: 14 additions & 14 deletions mempool/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestReactorBroadcastTxsMessage(t *testing.T) {
}
}()
for _, r := range reactors {
for _, peer := range r.Switch.Peers().List() {
for _, peer := range r.Switch.Peers().Copy() {
peer.Set(types.PeerStateKey, peerState{1})
}
}
Expand All @@ -78,7 +78,7 @@ func TestReactorConcurrency(t *testing.T) {
}
}()
for _, r := range reactors {
for _, peer := range r.Switch.Peers().List() {
for _, peer := range r.Switch.Peers().Copy() {
peer.Set(types.PeerStateKey, peerState{1})
}
}
Expand Down Expand Up @@ -135,7 +135,7 @@ func TestReactorNoBroadcastToSender(t *testing.T) {
}
}()
for _, r := range reactors {
for _, peer := range r.Switch.Peers().List() {
for _, peer := range r.Switch.Peers().Copy() {
peer.Set(types.PeerStateKey, peerState{1})
}
}
Expand Down Expand Up @@ -168,7 +168,7 @@ func TestReactor_MaxTxBytes(t *testing.T) {
}
}()
for _, r := range reactors {
for _, peer := range r.Switch.Peers().List() {
for _, peer := range r.Switch.Peers().Copy() {
peer.Set(types.PeerStateKey, peerState{1})
}
}
Expand Down Expand Up @@ -210,7 +210,7 @@ func TestBroadcastTxForPeerStopsWhenPeerStops(t *testing.T) {

// stop peer
sw := reactors[1].Switch
sw.StopPeerForError(sw.Peers().List()[0], errors.New("some reason"))
sw.StopPeerForError(sw.Peers().Copy()[0], errors.New("some reason"))

// check that we are not leaking any go-routines
// i.e. broadcastTxRoutine finishes when peer is stopped
Expand Down Expand Up @@ -285,7 +285,7 @@ func TestReactorTxSendersMultiNode(t *testing.T) {
}
}()
for _, r := range reactors {
for _, peer := range r.Switch.Peers().List() {
for _, peer := range r.Switch.Peers().Copy() {
peer.Set(types.PeerStateKey, peerState{1})
}
}
Expand Down Expand Up @@ -349,7 +349,7 @@ func TestMempoolFIFOWithParallelCheckTx(t *testing.T) {
}
}()
for _, r := range reactors {
for _, peer := range r.Switch.Peers().List() {
for _, peer := range r.Switch.Peers().Copy() {
peer.Set(types.PeerStateKey, peerState{1})
}
}
Expand Down Expand Up @@ -386,7 +386,7 @@ func TestMempoolReactorMaxActiveOutboundConnections(t *testing.T) {
}
}()
for _, r := range reactors {
for _, peer := range r.Switch.Peers().List() {
for _, peer := range r.Switch.Peers().Copy() {
peer.Set(types.PeerStateKey, peerState{1})
}
}
Expand All @@ -403,7 +403,7 @@ func TestMempoolReactorMaxActiveOutboundConnections(t *testing.T) {
}

// Disconnect the second reactor from the first reactor.
firstPeer := reactors[0].Switch.Peers().List()[0]
firstPeer := reactors[0].Switch.Peers().Copy()[0]
reactors[0].Switch.StopPeerGracefully(firstPeer)

// Now the third reactor should start receiving transactions from the first reactor; the fourth
Expand Down Expand Up @@ -432,13 +432,13 @@ func TestMempoolReactorMaxActiveOutboundConnectionsNoDuplicate(t *testing.T) {
}
}()
for _, r := range reactors {
for _, peer := range r.Switch.Peers().List() {
for _, peer := range r.Switch.Peers().Copy() {
peer.Set(types.PeerStateKey, peerState{1})
}
}

// Disconnect the second reactor from the third reactor.
pCon1_2 := reactors[1].Switch.Peers().List()[1]
pCon1_2 := reactors[1].Switch.Peers().Copy()[1]
reactors[1].Switch.StopPeerGracefully(pCon1_2)

// Add a bunch transactions to the first reactor.
Expand All @@ -453,7 +453,7 @@ func TestMempoolReactorMaxActiveOutboundConnectionsNoDuplicate(t *testing.T) {
}

// Disconnect the second reactor from the first reactor.
pCon0_1 := reactors[0].Switch.Peers().List()[0]
pCon0_1 := reactors[0].Switch.Peers().Copy()[0]
reactors[0].Switch.StopPeerGracefully(pCon0_1)

// Now the third reactor should start receiving transactions from the first reactor and
Expand All @@ -480,7 +480,7 @@ func TestMempoolReactorMaxActiveOutboundConnectionsStar(t *testing.T) {
}
}()
for _, r := range reactors {
for _, peer := range r.Switch.Peers().List() {
for _, peer := range r.Switch.Peers().Copy() {
peer.Set(types.PeerStateKey, peerState{1})
}
}
Expand All @@ -498,7 +498,7 @@ func TestMempoolReactorMaxActiveOutboundConnectionsStar(t *testing.T) {
}

// Disconnect the second reactor from the first reactor.
firstPeer := reactors[0].Switch.Peers().List()[0]
firstPeer := reactors[0].Switch.Peers().Copy()[0]
reactors[0].Switch.StopPeerGracefully(firstPeer)

// Now the third reactor should start receiving transactions from the first reactor; the fourth
Expand Down
36 changes: 21 additions & 15 deletions p2p/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,8 @@ type peer struct {
// User data
Data *cmap.CMap

metrics *Metrics
metricsTicker *time.Ticker
mlc *metricsLabelCache
metrics *Metrics
mlc *metricsLabelCache

// When removal of a peer fails, we set this flag
removalAttemptFailed bool
Expand All @@ -142,13 +141,12 @@ func newPeer(
options ...PeerOption,
) *peer {
p := &peer{
peerConn: pc,
nodeInfo: nodeInfo,
channels: nodeInfo.(DefaultNodeInfo).Channels,
Data: cmap.NewCMap(),
metricsTicker: time.NewTicker(metricsTickerDuration),
metrics: NopMetrics(),
mlc: mlc,
peerConn: pc,
nodeInfo: nodeInfo,
channels: nodeInfo.(DefaultNodeInfo).Channels,
Data: cmap.NewCMap(),
metrics: NopMetrics(),
mlc: mlc,
}

p.mconn = createMConnection(
Expand Down Expand Up @@ -202,17 +200,14 @@ func (p *peer) OnStart() error {

// FlushStop mimics OnStop but additionally ensures that all successful
// .Send() calls will get flushed before closing the connection.
//
// NOTE: it is not safe to call this method more than once.
func (p *peer) FlushStop() {
p.metricsTicker.Stop()
p.BaseService.OnStop()
p.mconn.FlushStop() // stop everything and close the conn
}

// OnStop implements BaseService.
func (p *peer) OnStop() {
p.metricsTicker.Stop()
p.BaseService.OnStop()
if err := p.mconn.Stop(); err != nil { // stop everything and close the conn
p.Logger.Debug("Error while stopping peer", "err", err)
}
Expand Down Expand Up @@ -256,12 +251,16 @@ func (p *peer) Status() cmtconn.ConnectionStatus {

// Send msg bytes to the channel identified by chID byte. Returns false if the
// send queue is full after timeout, specified by MConnection.
//
// thread safe.
func (p *peer) Send(e Envelope) bool {
return p.send(e.ChannelID, e.Message, p.mconn.Send)
}

// TrySend msg bytes to the channel identified by chID byte. Immediately returns
// false if the send queue is full.
//
// thread safe.
func (p *peer) TrySend(e Envelope) bool {
return p.send(e.ChannelID, e.Message, p.mconn.TrySend)
}
Expand Down Expand Up @@ -294,11 +293,15 @@ func (p *peer) send(chID byte, msg proto.Message, sendFunc func(byte, []byte) bo
}

// Get the data for a given key.
//
// thread safe.
func (p *peer) Get(key string) interface{} {
return p.Data.Get(key)
}

// Set sets the data for the given key.
//
// thread safe.
func (p *peer) Set(key string, data interface{}) {
p.Data.Set(key, data)
}
Expand Down Expand Up @@ -367,9 +370,12 @@ func PeerMetrics(metrics *Metrics) PeerOption {
}

func (p *peer) metricsReporter() {
metricsTicker := time.NewTicker(metricsTickerDuration)
defer metricsTicker.Stop()

for {
select {
case <-p.metricsTicker.C:
case <-metricsTicker.C:
status := p.mconn.Status()
var sendQueueSize float64
for _, chStatus := range status.Channels {
Expand Down
Loading

0 comments on commit e1ee71c

Please sign in to comment.