Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Blockpool race fix #231

Merged
merged 2 commits into from
Jul 5, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ build_race:

test: build
go test `${NOVENDOR}`

test_race: build
go test -race `${NOVENDOR}`

test100: build
for i in {1..100}; do make test; done
Expand Down
83 changes: 37 additions & 46 deletions blockchain/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,13 @@ type BlockPool struct {
QuitService
startTime time.Time

mtx sync.Mutex
// block requests
mtx sync.Mutex
requesters map[int]*bpRequester
height int // the lowest key in requesters.
numPending int32 // number of requests pending assignment or block response

// peers
peersMtx sync.Mutex
peers map[string]*bpPeer
peers map[string]*bpPeer

requestsCh chan<- BlockRequest
timeoutsCh chan<- string
Expand Down Expand Up @@ -81,13 +79,13 @@ func (pool *BlockPool) makeRequestersRoutine() {
if !pool.IsRunning() {
break
}
_, numPending := pool.GetStatus()
_, numPending, lenRequesters := pool.GetStatus()
if numPending >= maxPendingRequests {
// sleep for a bit.
time.Sleep(requestIntervalMS * time.Millisecond)
// check for timed out peers
pool.removeTimedoutPeers()
} else if len(pool.requesters) >= maxTotalRequesters {
} else if lenRequesters >= maxTotalRequesters {
// sleep for a bit.
time.Sleep(requestIntervalMS * time.Millisecond)
// check for timed out peers
Expand All @@ -100,6 +98,9 @@ func (pool *BlockPool) makeRequestersRoutine() {
}

func (pool *BlockPool) removeTimedoutPeers() {
pool.mtx.Lock()
defer pool.mtx.Unlock()

for _, peer := range pool.peers {
if !peer.didTimeout && peer.numPending > 0 {
curRate := peer.recvMonitor.Status().CurRate
Expand All @@ -111,38 +112,35 @@ func (pool *BlockPool) removeTimedoutPeers() {
}
}
if peer.didTimeout {
pool.peersMtx.Lock() // Lock
pool.removePeer(peer.id)
pool.peersMtx.Unlock()
}
}
}

func (pool *BlockPool) GetStatus() (height int, numPending int32) {
pool.mtx.Lock() // Lock
func (pool *BlockPool) GetStatus() (height int, numPending int32, lenRequesters int) {
pool.mtx.Lock()
defer pool.mtx.Unlock()

return pool.height, pool.numPending
return pool.height, pool.numPending, len(pool.requesters)
}

// TODO: relax conditions, prevent abuse.
func (pool *BlockPool) IsCaughtUp() bool {
pool.mtx.Lock()
defer pool.mtx.Unlock()

height := pool.height
pool.mtx.Unlock()

// Need at least 1 peer to be considered caught up.
if len(pool.peers) == 0 {
log.Debug("Blockpool has no peers")
return false
}

pool.peersMtx.Lock()
maxPeerHeight := 0
for _, peer := range pool.peers {
maxPeerHeight = MaxInt(maxPeerHeight, peer.height)
}
pool.peersMtx.Unlock()

isCaughtUp := (height > 0 || time.Now().Sub(pool.startTime) > 5*time.Second) && (maxPeerHeight == 0 || height >= maxPeerHeight)
log.Notice(Fmt("IsCaughtUp: %v", isCaughtUp), "height", height, "maxPeerHeight", maxPeerHeight)
Expand All @@ -153,7 +151,7 @@ func (pool *BlockPool) IsCaughtUp() bool {
// So we peek two blocks at a time.
// The caller will verify the commit.
func (pool *BlockPool) PeekTwoBlocks() (first *types.Block, second *types.Block) {
pool.mtx.Lock() // Lock
pool.mtx.Lock()
defer pool.mtx.Unlock()

if r := pool.requesters[pool.height]; r != nil {
Expand All @@ -168,7 +166,7 @@ func (pool *BlockPool) PeekTwoBlocks() (first *types.Block, second *types.Block)
// Pop the first block at pool.height
// It must have been validated by 'second'.Commit from PeekTwoBlocks().
func (pool *BlockPool) PopRequest() {
pool.mtx.Lock() // Lock
pool.mtx.Lock()
defer pool.mtx.Unlock()

if r := pool.requesters[pool.height]; r != nil {
Expand All @@ -188,21 +186,21 @@ func (pool *BlockPool) PopRequest() {
// Invalidates the block at pool.height,
// Remove the peer and redo request from others.
func (pool *BlockPool) RedoRequest(height int) {
pool.mtx.Lock() // Lock
defer pool.mtx.Unlock()

pool.mtx.Lock()
request := pool.requesters[height]
pool.mtx.Unlock()

if request.block == nil {
PanicSanity("Expected block to be non-nil")
}
// RemovePeer will redo all requesters associated with this peer.
// TODO: record this malfeasance
pool.RemovePeer(request.peerID) // Lock on peersMtx.
pool.RemovePeer(request.peerID)
}

// TODO: ensure that blocks come in order for each peer.
func (pool *BlockPool) AddBlock(peerID string, block *types.Block, blockSize int) {
pool.mtx.Lock() // Lock
pool.mtx.Lock()
defer pool.mtx.Unlock()

requester := pool.requesters[block.Height]
Expand All @@ -212,7 +210,7 @@ func (pool *BlockPool) AddBlock(peerID string, block *types.Block, blockSize int

if requester.setBlock(block, peerID) {
pool.numPending--
peer := pool.getPeer(peerID)
peer := pool.peers[peerID]
peer.decrPending(blockSize)
} else {
// Bad peer?
Expand All @@ -221,8 +219,8 @@ func (pool *BlockPool) AddBlock(peerID string, block *types.Block, blockSize int

// Sets the peer's alleged blockchain height.
func (pool *BlockPool) SetPeerHeight(peerID string, height int) {
pool.peersMtx.Lock() // Lock
defer pool.peersMtx.Unlock()
pool.mtx.Lock()
defer pool.mtx.Unlock()

peer := pool.peers[peerID]
if peer != nil {
Expand All @@ -234,8 +232,8 @@ func (pool *BlockPool) SetPeerHeight(peerID string, height int) {
}

func (pool *BlockPool) RemovePeer(peerID string) {
pool.peersMtx.Lock() // Lock
defer pool.peersMtx.Unlock()
pool.mtx.Lock()
defer pool.mtx.Unlock()

pool.removePeer(peerID)
}
Expand All @@ -250,22 +248,14 @@ func (pool *BlockPool) removePeer(peerID string) {
delete(pool.peers, peerID)
}

func (pool *BlockPool) getPeer(peerID string) *bpPeer {
pool.peersMtx.Lock() // Lock
defer pool.peersMtx.Unlock()

peer := pool.peers[peerID]
return peer
}

// Pick an available peer with at least the given minHeight.
// If no peers are available, returns nil.
func (pool *BlockPool) pickIncrAvailablePeer(minHeight int) *bpPeer {
pool.peersMtx.Lock()
defer pool.peersMtx.Unlock()
pool.mtx.Lock()
defer pool.mtx.Unlock()

for _, peer := range pool.peers {
if peer.isBad() {
if peer.didTimeout {
pool.removePeer(peer.id)
continue
} else {
Expand All @@ -283,7 +273,7 @@ func (pool *BlockPool) pickIncrAvailablePeer(minHeight int) *bpPeer {
}

func (pool *BlockPool) makeNextRequester() {
pool.mtx.Lock() // Lock
pool.mtx.Lock()
defer pool.mtx.Unlock()

nextHeight := pool.height + len(pool.requesters)
Expand Down Expand Up @@ -330,11 +320,13 @@ func (pool *BlockPool) debug() string {
type bpPeer struct {
pool *BlockPool
id string
height int
numPending int32
recvMonitor *flow.Monitor
timeout *time.Timer
didTimeout bool

mtx sync.Mutex
height int
numPending int32
timeout *time.Timer
didTimeout bool
}

func newBPPeer(pool *BlockPool, peerID string, height int) *bpPeer {
Expand Down Expand Up @@ -380,15 +372,14 @@ func (peer *bpPeer) decrPending(recvSize int) {
}

func (peer *bpPeer) onTimeout() {
peer.pool.mtx.Lock()
defer peer.pool.mtx.Unlock()

peer.pool.sendTimeout(peer.id)
log.Warn("SendTimeout", "peer", peer.id, "reason", "onTimeout")
peer.didTimeout = true
}

func (peer *bpPeer) isBad() bool {
return peer.didTimeout
}

//-------------------------------------

type bpRequester struct {
Expand Down
2 changes: 1 addition & 1 deletion blockchain/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ FOR_LOOP:
// ask for status updates
go bcR.BroadcastStatusRequest()
case _ = <-switchToConsensusTicker.C:
height, numPending := bcR.pool.GetStatus()
height, numPending, _ := bcR.pool.GetStatus()
outbound, inbound, _ := bcR.Switch.NumPeers()
log.Info("Consensus ticker", "numPending", numPending, "total", len(bcR.pool.requesters),
"outbound", outbound, "inbound", inbound)
Expand Down