Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

peer, main, netsync, blockchain: parallel block downloads #2226

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
peer: make peer meet query.Peer interface
query.Peer is used for downloading blocks out of order during headers
first download.  Methods SubscribeRecvMsg() and OnDisconnect() are added
to abide by the interface.
  • Loading branch information
kcalvinalvin committed Nov 29, 2024
commit 16c6e139b76a443faa2f5ecd3f7484a8119abfb8
33 changes: 33 additions & 0 deletions peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,10 @@ type Peer struct {
queueQuit chan struct{}
outQuit chan struct{}
quit chan struct{}

// subscribers is a channel for relaying all messages that were received
// to this peer.
subscribers []chan wire.Message
}

// String returns the peer's address and directionality as a human-readable
Expand Down Expand Up @@ -1099,6 +1103,24 @@ func (p *Peer) readMessage(encoding wire.MessageEncoding) (wire.Message, []byte,
return msg, buf, nil
}

// SubscribeRecvMsg adds a OnRead subscription to the peer. All bitcoin
// messages received from this peer will be sent on the returned
// channel. A closure is also returned, that should be called to cancel
// the subscription.
func (p *Peer) SubscribeRecvMsg() (<-chan wire.Message, func()) {
msgChan := make(chan wire.Message, 1)
p.subscribers = append(p.subscribers, msgChan)

// Cancellation is just removing the channel from the subscribers list.
idx := len(p.subscribers) - 1
cancel := func() {
p.subscribers = append(p.subscribers[:idx],
p.subscribers[idx+1:]...)
}

return msgChan, cancel
}

// writeMessage sends a bitcoin message to the peer with logging.
func (p *Peer) writeMessage(msg wire.Message, enc wire.MessageEncoding) error {
// Don't do anything if we're disconnecting.
Expand Down Expand Up @@ -1403,6 +1425,10 @@ out:
// needed.
rmsg, buf, err := p.readMessage(p.wireEncoding)
idleTimer.Stop()
// Send the received message to all the subscribers.
for _, sub := range p.subscribers {
sub <- rmsg
}
if err != nil {
// In order to allow regression tests with malformed messages, don't
// disconnect the peer when we're in regression test mode and the
Expand Down Expand Up @@ -1447,6 +1473,7 @@ out:
}
break out
}

atomic.StoreInt64(&p.lastRecv, time.Now().Unix())
p.stallControl <- stallControlMsg{sccReceiveMessage, rmsg}

Expand Down Expand Up @@ -1962,6 +1989,12 @@ func (p *Peer) Disconnect() {
close(p.quit)
}

// OnDisconnect returns a channel that will be closed when this peer is
// disconnected.
func (p *Peer) OnDisconnect() <-chan struct{} {
return p.quit
}

// readRemoteVersionMsg waits for the next message to arrive from the remote
// peer. If the next message is not a version message or the version is not
// acceptable then return an error.
Expand Down