Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Blockchain-v2 reactor second PR #4361

Closed
wants to merge 29 commits into from

Conversation

brapse
Copy link
Contributor

@brapse brapse commented Jan 31, 2020

The work includes the reactor which ties together all the seperate routines involved in the design of the blockchain v2 refactor. This PR replaces #4067 which got far too large and messy after a failed attempt to rebase.

  • Referenced an issue explaining the need for the change
  • Updated all relevant documentation in docs
  • Updated all code comments where relevant
  • Wrote tests
  • Updated CHANGELOG_PENDING.md

	+ I cleaner copy of the work done in #4067 which fell too far behind and was a nightmare to rebase.
	+ The work includes the reactor which ties together all the seperate routines involved in the design of the blockchain v2 refactor.
@brapse brapse mentioned this pull request Jan 31, 2020
5 tasks
ebuchman
ebuchman previously approved these changes Feb 8, 2020
Copy link
Contributor

@ebuchman ebuchman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good! Some high level review with a few comments.

Main takeaways re the concurrency:

  1. The queues we're using for sending to routines from demux are unbounded. This may be fine actually, since we won't lose messages before crashing, but it should be more clearly documented and we may want to track if these queues ever do get above a certain size. It also means the Routine.send will never fail until we're done (ie. call Dispose), which seems good.

  2. demux itself never blocks until termination, since it's either sending on an unbounded queue to the routines (of course this uses a mutex under the hood), or its using a TrySend on the p2p. Not entirely sure yet if there are blocking edge cases on termination that could cause problems in switching to consensus.

  3. Receive may block if the event channel is full. This is unlikely, given its size (1000), how many peers we have (~50), and how tight the demux loop is (which itself doesn't block), but we should probably track when Receive does block on the event channel as an indicator that the buffer may need to be bigger ... The p2p layer expects Receive not to block

  4. Routines may block on their output channels, but this doesn't seem to be a problem as nothing blocks on the routines activity (since the input is an unbounded queue!).

Does all that sound right?

sendBlockToPeer(block *types.Block, peerID p2p.ID) error
sendBlockNotFound(height int64, peerID p2p.ID) error
sendStatusResponse(height int64, peerID p2p.ID) error
switchToConsensus(state state.State, blocksSynced int)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a fan of visual semantics in structs/interfaces, eg. I'd put switchToConsensus last since it's not io like the others and I'd have newlines separating the send block from the broadcast "block" and the switch "block"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also is it worth having a receiving Io interface for the calls from Receive/AddPeer/RemovePeer that push msgs onto the events chan?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes all of this does sound right and i tried to capture it in a follow up issue #4206

time time.Time
//-------------------------------------

type bcBlockRequestMessage struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feel like these should go in the io.go where they're used.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

at this point the messages/ events are owned and defined by the entities that generate/ send them.

// NewBlockchainReactor creates a new reactor instance.
func NewBlockchainReactor(state state.State, blockApplier blockApplier, store blockStore, fastSync bool) *BlockchainReactor {
reporter := behaviour.NewMockReporter()
return newReactor(state, store, reporter, blockApplier, 1000)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hard coded buffer size should be a global var at the top of the file

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also this bufferSize has three distinct functionalities:

  1. It's the size of the reactor's events channel. This means the Receive routine will block when the events channel is full. We may need to log an error or track when this happens, or otherwise be responsive to it since the P2P system expects Receive's not to block!

  2. It's the initial allocated size for the priority queues (ie. input channels to the routines), but from what I can tell the implementation of those queues are actually unbounded, which may be fine, but it means this buffer is much less important.

  3. It's the size of the output channels for the routines. This means the routines' run loop can block on sending to this channel. We might want to also track when this happens.

These distinctions should be somehow clearer in the code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried to capture the queuing concerns in #4206

if err != nil {
return pcDuplicateBlock{}, nil

// enqueue block if height is higher than state height, else ignore it
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When/how can it happen that we receive a lower block here and does/shouldn't it correlate to removing a peer?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is possible that the scheduler detected an error for the peer that sent us block at height H and re-requested from another peer. Examples are when this peer sends us a block we haven't asked for or lowered its height. Meantime the processor validated H, sent a pcProcessedBlock towards the scheduler and moved to H+1. Before the scheduler processes this, it receives and forwards scBlockReceived for H from a different peer towards the processor.
processor and scheduler are two hops away wrt communication and there may be inflight messages in both directions.

r.processor.send(event)
r.reporter.Report(behaviour.BadMessage(event.peerID, "scPeerError"))
case scBlockRequest:
r.io.sendBlockRequest(event.peerID, event.height)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need/want to track if this one fails ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created a follow up issue #4414.

}
}

case *bcStatusResponseMessage:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If events is full these will block. This shouldn't really happen, as I'd expect the demux loops to run faster than this can saturate, but it might be worth tracking if it does happen (eg. by logging an error and/or incrementing a metric, but still trying to do a blocking send?), maybe with a common r.pushEvent(...) method

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense, should be covered in follow up issue #4206

Copy link
Contributor

@ebuchman ebuchman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good! Some high level review with a few comments.

Main takeaways re the concurrency:

  1. The queues we're using for sending to routines from demux are unbounded. This may be fine actually, since we won't lose messages before crashing, but it should be more clearly documented and we may want to track if these queues ever do get above a certain size. It also means the Routine.send will never fail until we're done (ie. call Dispose), which seems good.

  2. demux itself never blocks until termination, since it's either sending on an unbounded queue to the routines (of course this uses a mutex under the hood), or its using a TrySend on the p2p. Not entirely sure yet if there are blocking edge cases on termination that could cause problems in switching to consensus.

  3. Receive may block if the event channel is full. This is unlikely, given its size (1000), how many peers we have (~50), and how tight the demux loop is (which itself doesn't block), but we should probably track when Receive does block on the event channel as an indicator that the buffer may need to be bigger ... The p2p layer expects Receive not to block

  4. Routines may block on their output channels, but this doesn't seem to be a problem as nothing blocks on the routines activity (since the input is an unbounded queue!).

Does all that sound right?

@ebuchman
Copy link
Contributor

ebuchman commented Feb 9, 2020

Few more thoughts from reviewing this and the new TLA+.

Removing Peers

Right now we remove blocks from the processor's internal queue if the peer was removed. This makes sense for peers that send us bad data, but for peers that only timedout, it might be aggressive to throw away their blocks. Should we consider checking the reason for the peer to be removed before forwarding it to the processor?

Tickers

There are 4 tickers in the reactor and I wonder if we can/should try to reduce it to 1? It seems some functionality that is currently driven by ticks can be driven by real events instead, though FSMs may then need to be able to output multiple events at a time. Probably the only thing we should really need ticks for are detecting timeouts. In principle I think it means each routine (possibly the whole reactor) can/should have max 1 ticker. For instance:

doProcessBlock can be removed by just checking if we can process a block every time we receive one from the scheduler. Then the processor wouldn't need input from any tickers.

doSchedule can probably be eliminated if other calls to the scheduler check if we should schedule more requests, and return scBlockRequest as necessary

doStatus can probably be similarly emitted by letting the scheduler decide when it needs updated statuses and outputing a new scStatusRequest rather than just leaving this to the reactor.

doPrunePeer is then the only thing that's left, and it's only there to detect peer timeouts. In principle we could rename it to doTick and it could trigger scheduling block or status requests as well.

Does it make sense to consider this? Or is there stronger justification for the FSMs to only output one event at a time?

Copy link
Contributor

@ebuchman ebuchman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment about scSchedulerFail

}

return noOp, nil
}

func (sc *scheduler) handleAddNewPeer(event addNewPeer) (Event, error) {
func (sc *scheduler) handleAddNewPeer(event bcAddNewPeer) (Event, error) {
err := sc.addPeer(event.peerID)
if err != nil {
return scSchedulerFail{reason: err}, nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it seems these failure events don't trigger anything right? Note some of them seem like genuine internal errors (ie. the ones in handleTrySchedule), while others seem highly probable in practice. This one especially because the peer layer may try to re-add a peer we disconnected from in the past. Other fails are due to race conditions. So it certainly doesn't seem like we can consider these fatal without more work, and we should double check that its ok that these will happen.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, we need to comb through the cases and make sure we are not in a corrupt state.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made a follow up issue. #4415

delete(state.queue, state.height)
state.blocksSynced++
func (state *pcState) enqueue(peerID p2p.ID, block *types.Block, height int64) {
state.queue[height] = queueItem{block: block, peerID: peerID}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we check that no queueItem exists for that height before setting it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this will will overwrite the existing block.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed!


// ValidateBasic performs basic validation.
func (m *bcBlockResponseMessage) ValidateBasic() error {
return m.Block.ValidateBasic()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if Block is nil here?

syncHeight int64

reporter behaviour.Reporter
io iIo
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what iIo stands for? maybe there's a better name?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah think we roughly tried to isolate the input-output for testing but maybe didn't do such a. good job. Totally open to recommendations ☺️

Copy link
Contributor

@melekes melekes Feb 11, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. I just did not understand what i prefix stands for. interface? so like iIo - IO interface?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, not the best naming i'm sure.

<-r.processor.ready()
// reactor generated ticker events:
// ticker for cleaning peers
type rTryPrunePeer struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need r prefix? because events are used outside of this file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We prefix the event with r to indicate that this is a reactor event.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could this be added to the doc.go file so someone doesn't remove it in the future


// Tickers: perform tasks periodically
case <-doScheduleCh:
r.scheduler.send(rTrySchedule{time: time.Now()})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't we merge this with above cases? i.e.

case <-doScheduleTk.C:
  r.scheduler.send(rTrySchedule{time: time.Now()})

call scheduler directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, will address in a follow up PR #4410.

func (r *BlockchainReactor) AddPeer(peer p2p.Peer) {
err := r.io.sendStatusResponse(r.store.Height(), peer.ID())
if err != nil {
r.logger.Error("Could not send status message to peer new", "src", peer.ID, "height", r.SyncHeight())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it ok to continue if we can't send something to a peer? just want to double check

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's ok. The peer will send a statusRequest later.

Co-Authored-By: Bot from GolangCI <42910462+golangcibot@users.noreply.github.com>
@codecov-io
Copy link

codecov-io commented Feb 14, 2020

Codecov Report

Merging #4361 into master will decrease coverage by 0.02%.
The diff coverage is n/a.

@@            Coverage Diff             @@
##           master    #4361      +/-   ##
==========================================
- Coverage   65.49%   65.46%   -0.03%     
==========================================
  Files         228      228              
  Lines       20239    20239              
==========================================
- Hits        13255    13249       -6     
- Misses       5913     5918       +5     
- Partials     1071     1072       +1
Impacted Files Coverage Δ
consensus/replay.go 71.76% <0%> (-0.79%) ⬇️
consensus/reactor.go 77.16% <0%> (-0.7%) ⬇️
p2p/pex/pex_reactor.go 81.92% <0%> (-0.57%) ⬇️
blockchain/v0/pool.go 78.2% <0%> (-0.33%) ⬇️
consensus/state.go 77.74% <0%> (+0.1%) ⬆️
privval/signer_endpoint.go 83.78% <0%> (+2.7%) ⬆️
privval/signer_server.go 95.65% <0%> (+4.34%) ⬆️

@ancazamfir
Copy link
Contributor

Few more thoughts from reviewing this and the new TLA+.

Removing Peers

Right now we remove blocks from the processor's internal queue if the peer was removed. This makes sense for peers that send us bad data, but for peers that only timedout, it might be aggressive to throw away their blocks. Should we consider checking the reason for the peer to be removed before forwarding it to the processor?

There isn't a peerRemove event going to the processor, only scPeerError. This is sent by the scheduler to the processor when a peer is deemed faulty (lowered height, sent us block we haven't asked for). A peer that is timed-out by the scheduler is only removed from the scheduler. The scPeersPruned event is sinked in the reactor. While the scheduler removes all blocks from those peers, any unprocessed received blocks are in the processor's queue and may be processed or later removed or replaced by other blocks.
Then there is the peerRemove event that comes from the Switch and that event is accompanied by a reason field. We ignore this as the peer is gone anyway and we need to remove it. Again, any blocks from this peer are left in the processor's queue. Maybe here we should remove them if the reason is not nil.

Tickers

There are 4 tickers in the reactor and I wonder if we can/should try to reduce it to 1? It seems some functionality that is currently driven by ticks can be driven by real events instead, though FSMs may then need to be able to output multiple events at a time. Probably the only thing we should really need ticks for are detecting timeouts. In principle I think it means each routine (possibly the whole reactor) can/should have max 1 ticker. For instance:

doProcessBlock can be removed by just checking if we can process a block every time we receive one from the scheduler. Then the processor wouldn't need input from any tickers.

doSchedule can probably be eliminated if other calls to the scheduler check if we should schedule more requests, and return scBlockRequest as necessary

doStatus can probably be similarly emitted by letting the scheduler decide when it needs updated statuses and outputing a new scStatusRequest rather than just leaving this to the reactor.

doPrunePeer is then the only thing that's left, and it's only there to detect peer timeouts. In principle we could rename it to doTick and it could trigger scheduling block or status requests as well.

Does it make sense to consider this? Or is there stronger justification for the FSMs to only output one event at a time?

Yes, I was thinking about this. But on the other hand I believe we should isolate the time related handling from the FSMs and there should be explicit events as if we would have separate timers. This is very convenient for verification (as we discovered in the TLA+ specs for both v1 and v2).
I will look at having one ticker that generates multiple events for the scheduler and one for the processor.
I also found it cleaner to make the block requests on a ticker base vs. triggered by other events...but we can discuss...
Should we cover this change under a different PR? It is a bit more involved.

@melekes
Copy link
Contributor

melekes commented Feb 18, 2020

@brapse
Copy link
Contributor Author

brapse commented Feb 18, 2020

Some of the mocks here are not used but probably should be. Created a follow up issue #4430.

melekes added a commit that referenced this pull request Feb 19, 2020
The work includes the reactor which ties together all the seperate routines involved in the design of the blockchain v2 refactor. This PR replaces #4067 which got far too large and messy after a failed attempt to rebase.

## Commits:

* Blockchainv 2 reactor:

	+ I cleaner copy of the work done in #4067 which fell too far behind and was a nightmare to rebase.
	+ The work includes the reactor which ties together all the seperate routines involved in the design of the blockchain v2 refactor.

* fixes after merge

* reorder iIO interface methodset

* change iO -> IO

* panic before send nil block

* rename switchToConsensus -> trySwitchToConsensus

* rename tdState -> tmState

* Update blockchain/v2/reactor.go

Co-Authored-By: Bot from GolangCI <42910462+golangcibot@users.noreply.github.com>

* remove peer when it sends a block unsolicited

* check for not ready in markReceived

* fix error

* fix the pcFinished event

* typo fix

* add documentation for processor fields

* simplify time.Since

* try and make the linter happy

* some doc updates

* fix channel diagram

* Update adr-043-blockchain-riri-org.md

* panic on nil switch

* liting fixes

* account for nil block in bBlockResponseMessage

* panic on duplicate block enqueued by processor

* linting

* goimport reactor_test.go

Co-authored-by: Bot from GolangCI <42910462+golangcibot@users.noreply.github.com>
Co-authored-by: Anca Zamfir <ancazamfir@users.noreply.github.com>
Co-authored-by: Marko <marbar3778@yahoo.com>
Co-authored-by: Anton Kaliaev <anton.kalyaev@gmail.com>
@melekes melekes closed this Feb 19, 2020
@melekes
Copy link
Contributor

melekes commented Feb 19, 2020

Merged. For some reason github did not close the PR. The commit is on master.

@melekes melekes deleted the brapse/blockchain-v2-riri-reactor-2 branch February 19, 2020 15:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

7 participants