-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Blockchain-v2 reactor second PR #4361
Conversation
+ I cleaner copy of the work done in #4067 which fell too far behind and was a nightmare to rebase. + The work includes the reactor which ties together all the seperate routines involved in the design of the blockchain v2 refactor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good! Some high level review with a few comments.
Main takeaways re the concurrency:
-
The queues we're using for sending to routines from
demux
are unbounded. This may be fine actually, since we won't lose messages before crashing, but it should be more clearly documented and we may want to track if these queues ever do get above a certain size. It also means theRoutine.send
will never fail until we're done (ie. call Dispose), which seems good. -
demux
itself never blocks until termination, since it's either sending on an unbounded queue to the routines (of course this uses a mutex under the hood), or its using a TrySend on the p2p. Not entirely sure yet if there are blocking edge cases on termination that could cause problems in switching to consensus. -
Receive may block if the
event
channel is full. This is unlikely, given its size (1000), how many peers we have (~50), and how tight the demux loop is (which itself doesn't block), but we should probably track when Receive does block on the event channel as an indicator that the buffer may need to be bigger ... The p2p layer expects Receive not to block -
Routines may block on their output channels, but this doesn't seem to be a problem as nothing blocks on the routines activity (since the input is an unbounded queue!).
Does all that sound right?
blockchain/v2/io.go
Outdated
sendBlockToPeer(block *types.Block, peerID p2p.ID) error | ||
sendBlockNotFound(height int64, peerID p2p.ID) error | ||
sendStatusResponse(height int64, peerID p2p.ID) error | ||
switchToConsensus(state state.State, blocksSynced int) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a fan of visual semantics in structs/interfaces, eg. I'd put switchToConsensus last since it's not io like the others and I'd have newlines separating the send
block from the broadcast
"block" and the switch
"block"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also is it worth having a receiving Io interface for the calls from Receive/AddPeer/RemovePeer that push msgs onto the events
chan?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes all of this does sound right and i tried to capture it in a follow up issue #4206
blockchain/v2/reactor.go
Outdated
time time.Time | ||
//------------------------------------- | ||
|
||
type bcBlockRequestMessage struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Feel like these should go in the io.go where they're used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
at this point the messages/ events are owned and defined by the entities that generate/ send them.
blockchain/v2/reactor.go
Outdated
// NewBlockchainReactor creates a new reactor instance. | ||
func NewBlockchainReactor(state state.State, blockApplier blockApplier, store blockStore, fastSync bool) *BlockchainReactor { | ||
reporter := behaviour.NewMockReporter() | ||
return newReactor(state, store, reporter, blockApplier, 1000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hard coded buffer size should be a global var at the top of the file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also this bufferSize has three distinct functionalities:
-
It's the size of the reactor's events channel. This means the Receive routine will block when the events channel is full. We may need to log an error or track when this happens, or otherwise be responsive to it since the P2P system expects Receive's not to block!
-
It's the initial allocated size for the priority queues (ie. input channels to the routines), but from what I can tell the implementation of those queues are actually unbounded, which may be fine, but it means this buffer is much less important.
-
It's the size of the output channels for the routines. This means the routines' run loop can block on sending to this channel. We might want to also track when this happens.
These distinctions should be somehow clearer in the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tried to capture the queuing concerns in #4206
blockchain/v2/processor.go
Outdated
if err != nil { | ||
return pcDuplicateBlock{}, nil | ||
|
||
// enqueue block if height is higher than state height, else ignore it |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When/how can it happen that we receive a lower block here and does/shouldn't it correlate to removing a peer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is possible that the scheduler detected an error for the peer that sent us block at height H and re-requested from another peer. Examples are when this peer sends us a block we haven't asked for or lowered its height. Meantime the processor validated H, sent a pcProcessedBlock
towards the scheduler and moved to H+1. Before the scheduler processes this, it receives and forwards scBlockReceived
for H from a different peer towards the processor.
processor and scheduler are two hops away wrt communication and there may be inflight messages in both directions.
blockchain/v2/reactor.go
Outdated
r.processor.send(event) | ||
r.reporter.Report(behaviour.BadMessage(event.peerID, "scPeerError")) | ||
case scBlockRequest: | ||
r.io.sendBlockRequest(event.peerID, event.height) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need/want to track if this one fails ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created a follow up issue #4414.
blockchain/v2/reactor.go
Outdated
} | ||
} | ||
|
||
case *bcStatusResponseMessage: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If events
is full these will block. This shouldn't really happen, as I'd expect the demux loops to run faster than this can saturate, but it might be worth tracking if it does happen (eg. by logging an error and/or incrementing a metric, but still trying to do a blocking send?), maybe with a common r.pushEvent(...)
method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense, should be covered in follow up issue #4206
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good! Some high level review with a few comments.
Main takeaways re the concurrency:
-
The queues we're using for sending to routines from
demux
are unbounded. This may be fine actually, since we won't lose messages before crashing, but it should be more clearly documented and we may want to track if these queues ever do get above a certain size. It also means theRoutine.send
will never fail until we're done (ie. call Dispose), which seems good. -
demux
itself never blocks until termination, since it's either sending on an unbounded queue to the routines (of course this uses a mutex under the hood), or its using a TrySend on the p2p. Not entirely sure yet if there are blocking edge cases on termination that could cause problems in switching to consensus. -
Receive may block if the
event
channel is full. This is unlikely, given its size (1000), how many peers we have (~50), and how tight the demux loop is (which itself doesn't block), but we should probably track when Receive does block on the event channel as an indicator that the buffer may need to be bigger ... The p2p layer expects Receive not to block -
Routines may block on their output channels, but this doesn't seem to be a problem as nothing blocks on the routines activity (since the input is an unbounded queue!).
Does all that sound right?
Few more thoughts from reviewing this and the new TLA+. Removing PeersRight now we remove blocks from the processor's internal queue if the peer was removed. This makes sense for peers that send us bad data, but for peers that only timedout, it might be aggressive to throw away their blocks. Should we consider checking the reason for the peer to be removed before forwarding it to the processor? TickersThere are 4 tickers in the reactor and I wonder if we can/should try to reduce it to 1? It seems some functionality that is currently driven by ticks can be driven by real events instead, though FSMs may then need to be able to output multiple events at a time. Probably the only thing we should really need ticks for are detecting timeouts. In principle I think it means each routine (possibly the whole reactor) can/should have max 1 ticker. For instance:
Does it make sense to consider this? Or is there stronger justification for the FSMs to only output one event at a time? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment about scSchedulerFail
blockchain/v2/scheduler.go
Outdated
} | ||
|
||
return noOp, nil | ||
} | ||
|
||
func (sc *scheduler) handleAddNewPeer(event addNewPeer) (Event, error) { | ||
func (sc *scheduler) handleAddNewPeer(event bcAddNewPeer) (Event, error) { | ||
err := sc.addPeer(event.peerID) | ||
if err != nil { | ||
return scSchedulerFail{reason: err}, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So it seems these failure events don't trigger anything right? Note some of them seem like genuine internal errors (ie. the ones in handleTrySchedule
), while others seem highly probable in practice. This one especially because the peer layer may try to re-add a peer we disconnected from in the past. Other fails are due to race conditions. So it certainly doesn't seem like we can consider these fatal without more work, and we should double check that its ok that these will happen.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, we need to comb through the cases and make sure we are not in a corrupt state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made a follow up issue. #4415
blockchain/v2/processor.go
Outdated
delete(state.queue, state.height) | ||
state.blocksSynced++ | ||
func (state *pcState) enqueue(peerID p2p.ID, block *types.Block, height int64) { | ||
state.queue[height] = queueItem{block: block, peerID: peerID} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we check that no queueItem
exists for that height before setting it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this will will overwrite the existing block.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed!
blockchain/v2/reactor.go
Outdated
|
||
// ValidateBasic performs basic validation. | ||
func (m *bcBlockResponseMessage) ValidateBasic() error { | ||
return m.Block.ValidateBasic() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if Block
is nil here?
blockchain/v2/reactor.go
Outdated
syncHeight int64 | ||
|
||
reporter behaviour.Reporter | ||
io iIo |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what iIo
stands for? maybe there's a better name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah think we roughly tried to isolate the input-output
for testing but maybe didn't do such a. good job. Totally open to recommendations
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay. I just did not understand what i
prefix stands for. interface? so like iIo - IO interface?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, not the best naming i'm sure.
blockchain/v2/reactor.go
Outdated
<-r.processor.ready() | ||
// reactor generated ticker events: | ||
// ticker for cleaning peers | ||
type rTryPrunePeer struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need r
prefix? because events are used outside of this file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We prefix the event with r
to indicate that this is a reactor event.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could this be added to the doc.go
file so someone doesn't remove it in the future
blockchain/v2/reactor.go
Outdated
|
||
// Tickers: perform tasks periodically | ||
case <-doScheduleCh: | ||
r.scheduler.send(rTrySchedule{time: time.Now()}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can't we merge this with above cases? i.e.
case <-doScheduleTk.C:
r.scheduler.send(rTrySchedule{time: time.Now()})
call scheduler directly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed, will address in a follow up PR #4410.
blockchain/v2/reactor.go
Outdated
func (r *BlockchainReactor) AddPeer(peer p2p.Peer) { | ||
err := r.io.sendStatusResponse(r.store.Height(), peer.ID()) | ||
if err != nil { | ||
r.logger.Error("Could not send status message to peer new", "src", peer.ID, "height", r.SyncHeight()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it ok to continue if we can't send something to a peer? just want to double check
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's ok. The peer will send a statusRequest
later.
Co-Authored-By: Bot from GolangCI <42910462+golangcibot@users.noreply.github.com>
Codecov Report
@@ Coverage Diff @@
## master #4361 +/- ##
==========================================
- Coverage 65.49% 65.46% -0.03%
==========================================
Files 228 228
Lines 20239 20239
==========================================
- Hits 13255 13249 -6
- Misses 5913 5918 +5
- Partials 1071 1072 +1
|
There isn't a
Yes, I was thinking about this. But on the other hand I believe we should isolate the time related handling from the FSMs and there should be explicit events as if we would have separate timers. This is very convenient for verification (as we discovered in the TLA+ specs for both v1 and v2). |
@brapse do you want me to fix https://golangci.com/r/github.com/tendermint/tendermint/pulls/4361? |
Some of the mocks here are not used but probably should be. Created a follow up issue #4430. |
The work includes the reactor which ties together all the seperate routines involved in the design of the blockchain v2 refactor. This PR replaces #4067 which got far too large and messy after a failed attempt to rebase. ## Commits: * Blockchainv 2 reactor: + I cleaner copy of the work done in #4067 which fell too far behind and was a nightmare to rebase. + The work includes the reactor which ties together all the seperate routines involved in the design of the blockchain v2 refactor. * fixes after merge * reorder iIO interface methodset * change iO -> IO * panic before send nil block * rename switchToConsensus -> trySwitchToConsensus * rename tdState -> tmState * Update blockchain/v2/reactor.go Co-Authored-By: Bot from GolangCI <42910462+golangcibot@users.noreply.github.com> * remove peer when it sends a block unsolicited * check for not ready in markReceived * fix error * fix the pcFinished event * typo fix * add documentation for processor fields * simplify time.Since * try and make the linter happy * some doc updates * fix channel diagram * Update adr-043-blockchain-riri-org.md * panic on nil switch * liting fixes * account for nil block in bBlockResponseMessage * panic on duplicate block enqueued by processor * linting * goimport reactor_test.go Co-authored-by: Bot from GolangCI <42910462+golangcibot@users.noreply.github.com> Co-authored-by: Anca Zamfir <ancazamfir@users.noreply.github.com> Co-authored-by: Marko <marbar3778@yahoo.com> Co-authored-by: Anton Kaliaev <anton.kalyaev@gmail.com>
Merged. For some reason github did not close the PR. The commit is on master. |
The work includes the reactor which ties together all the seperate routines involved in the design of the blockchain v2 refactor. This PR replaces #4067 which got far too large and messy after a failed attempt to rebase.