diff --git a/Makefile b/Makefile index 46f6cb57f94..c73e771946d 100644 --- a/Makefile +++ b/Makefile @@ -1,10 +1,10 @@ .PHONY: get_deps build all list_deps install -all: install +all: test install TMROOT = $${TMROOT:-$$HOME/.tendermint} -install: +install: get_deps go install github.com/tendermint/tendermint/cmd/tendermint build: @@ -32,7 +32,7 @@ list_deps: go list -f '{{join .Deps "\n"}}' github.com/tendermint/tendermint/... | xargs go list -f '{{if not .Standard}}{{.ImportPath}}{{end}}' get_deps: - go get github.com/tendermint/tendermint/... + go get -d github.com/tendermint/tendermint/... revision: -echo `git rev-parse --verify HEAD` > $(TMROOT)/revision diff --git a/blockchain/reactor.go b/blockchain/reactor.go index ca12f3c71f7..7bcd1cb8829 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -42,20 +42,20 @@ type consensusReactor interface { type BlockchainReactor struct { p2p.BaseReactor - sw *p2p.Switch - state *sm.State - proxyAppCtx proxy.AppContext // same as consensus.proxyAppCtx - store *BlockStore - pool *BlockPool - sync bool - requestsCh chan BlockRequest - timeoutsCh chan string - lastBlock *types.Block + sw *p2p.Switch + state *sm.State + proxyAppConn proxy.AppConn // same as consensus.proxyAppConn + store *BlockStore + pool *BlockPool + sync bool + requestsCh chan BlockRequest + timeoutsCh chan string + lastBlock *types.Block evsw *events.EventSwitch } -func NewBlockchainReactor(state *sm.State, proxyAppCtx proxy.AppContext, store *BlockStore, sync bool) *BlockchainReactor { +func NewBlockchainReactor(state *sm.State, proxyAppConn proxy.AppConn, store *BlockStore, sync bool) *BlockchainReactor { if state.LastBlockHeight == store.Height()-1 { store.height -= 1 // XXX HACK, make this better } @@ -70,13 +70,13 @@ func NewBlockchainReactor(state *sm.State, proxyAppCtx proxy.AppContext, store * timeoutsCh, ) bcR := &BlockchainReactor{ - state: state, - proxyAppCtx: proxyAppCtx, - store: store, - pool: pool, - sync: sync, - requestsCh: requestsCh, - timeoutsCh: timeoutsCh, + state: state, + proxyAppConn: proxyAppConn, + store: store, + pool: pool, + sync: sync, + requestsCh: requestsCh, + timeoutsCh: timeoutsCh, } bcR.BaseReactor = *p2p.NewBaseReactor(log, "BlockchainReactor", bcR) return bcR @@ -231,16 +231,18 @@ FOR_LOOP: break SYNC_LOOP } else { bcR.pool.PopRequest() - err := bcR.state.ExecBlock(bcR.proxyAppCtx, first, firstPartsHeader) + err := bcR.state.ExecBlock(bcR.evsw, bcR.proxyAppConn, first, firstPartsHeader) if err != nil { // TODO This is bad, are we zombie? PanicQ(Fmt("Failed to process committed block: %v", err)) } - err = bcR.state.Commit(bcR.proxyAppCtx) - if err != nil { - // TODO Handle gracefully. - PanicQ(Fmt("Failed to commit block at application: %v", err)) - } + /* + err = bcR.proxyAppConn.CommitSync() + if err != nil { + // TODO Handle gracefully. + PanicQ(Fmt("Failed to commit block at application: %v", err)) + } + */ bcR.store.SaveBlock(first, firstParts, second.LastValidation) bcR.state.Save() } diff --git a/consensus/common_test.go b/consensus/common_test.go index c0b82c3cf33..9f7bc6695df 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -4,6 +4,7 @@ import ( "bytes" "fmt" "sort" + "sync" "testing" "time" @@ -310,17 +311,15 @@ func simpleConsensusState(nValidators int) (*ConsensusState, []*validatorStub) { blockStore := bc.NewBlockStore(blockDB) // one for mempool, one for consensus - app := example.NewCounterApplication(false) - appCMem := app.Open() - appCCon := app.Open() - proxyAppCtxMem := proxy.NewLocalAppContext(appCMem) - proxyAppCtxCon := proxy.NewLocalAppContext(appCCon) + mtx, app := new(sync.Mutex), example.NewCounterApplication(false) + proxyAppConnMem := proxy.NewLocalAppConn(mtx, app) + proxyAppConnCon := proxy.NewLocalAppConn(mtx, app) // Make Mempool - mempool := mempl.NewMempool(proxyAppCtxMem) + mempool := mempl.NewMempool(proxyAppConnMem) // Make ConsensusReactor - cs := NewConsensusState(state, proxyAppCtxCon, blockStore, mempool) + cs := NewConsensusState(state, proxyAppConnCon, blockStore, mempool) cs.SetPrivValidator(privVals[0]) evsw := events.NewEventSwitch() diff --git a/consensus/state.go b/consensus/state.go index 782efef0280..dd7ec5550be 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -24,7 +24,7 @@ var ( timeoutPrevoteDelta = 0500 * time.Millisecond // timeoutPrevoteN is timeoutPrevote0 + timeoutPrevoteDelta*N timeoutPrecommit0 = 1000 * time.Millisecond // After any +2/3 precommits received, wait this long for stragglers. timeoutPrecommitDelta = 0500 * time.Millisecond // timeoutPrecommitN is timeoutPrecommit0 + timeoutPrecommitDelta*N - timeoutCommit = 100 * time.Millisecond // After +2/3 commits received for committed block, wait this long for stragglers in the next height's RoundStepNewHeight. + timeoutCommit = 1000 * time.Millisecond // After +2/3 commits received for committed block, wait this long for stragglers in the next height's RoundStepNewHeight. ) @@ -173,16 +173,14 @@ func (ti *timeoutInfo) String() string { type ConsensusState struct { QuitService - proxyAppCtx proxy.AppContext + proxyAppConn proxy.AppConn blockStore *bc.BlockStore mempool *mempl.Mempool privValidator *types.PrivValidator mtx sync.Mutex RoundState - state *sm.State // State until height-1. - stagedBlock *types.Block // Cache last staged block. - stagedState *sm.State // Cache result of staged block. + state *sm.State // State until height-1. peerMsgQueue chan msgInfo // serializes msgs affecting state (proposals, block parts, votes) internalMsgQueue chan msgInfo // like peerMsgQueue but for our own proposals, parts, votes @@ -191,14 +189,13 @@ type ConsensusState struct { tockChan chan timeoutInfo // timeouts are relayed on tockChan to the receiveRoutine evsw *events.EventSwitch - evc *events.EventCache // set in stageBlock and passed into state nSteps int // used for testing to limit the number of transitions the state makes } -func NewConsensusState(state *sm.State, proxyAppCtx proxy.AppContext, blockStore *bc.BlockStore, mempool *mempl.Mempool) *ConsensusState { +func NewConsensusState(state *sm.State, proxyAppConn proxy.AppConn, blockStore *bc.BlockStore, mempool *mempl.Mempool) *ConsensusState { cs := &ConsensusState{ - proxyAppCtx: proxyAppCtx, + proxyAppConn: proxyAppConn, blockStore: blockStore, mempool: mempool, peerMsgQueue: make(chan msgInfo, msgQueueSize), @@ -416,7 +413,7 @@ func (cs *ConsensusState) updateToState(state *sm.State) { // Reset fields based on state. validators := state.Validators - height := state.LastBlockHeight + 1 // next desired block height + height := state.LastBlockHeight + 1 // Next desired block height lastPrecommits := (*types.VoteSet)(nil) if cs.CommitRound > -1 && cs.Votes != nil { if !cs.Votes.Precommits(cs.CommitRound).HasTwoThirdsMajority() { @@ -452,8 +449,6 @@ func (cs *ConsensusState) updateToState(state *sm.State) { cs.LastValidators = state.LastValidators cs.state = state - cs.stagedBlock = nil - cs.stagedState = nil // Finally, broadcast RoundState cs.newStep() @@ -795,8 +790,8 @@ func (cs *ConsensusState) createProposalBlock() (block *types.Block, blockParts return } - // Mempool run transactions and the resulting hash - txs, hash, err := cs.mempool.Reap() + // Mempool validated transactions + txs, err := cs.mempool.Reap() if err != nil { log.Warn("createProposalBlock: Error getting proposal txs", "error", err) return nil, nil @@ -812,7 +807,7 @@ func (cs *ConsensusState) createProposalBlock() (block *types.Block, blockParts LastBlockHash: cs.state.LastBlockHash, LastBlockParts: cs.state.LastBlockParts, ValidatorsHash: cs.state.Validators.Hash(), - AppHash: hash, + AppHash: cs.state.AppHash, // state merkle root of txs from the previous block. }, LastValidation: validation, Data: &types.Data{ @@ -878,8 +873,8 @@ func (cs *ConsensusState) doPrevote(height int, round int) { return } - // Try staging cs.ProposalBlock - err := cs.stageBlock(cs.ProposalBlock, cs.ProposalBlockParts) + // Valdiate proposal block + err := cs.state.ValidateBlock(cs.ProposalBlock) if err != nil { // ProposalBlock is invalid, prevote nil. log.Warn("enterPrevote: ProposalBlock is invalid", "error", err) @@ -992,7 +987,7 @@ func (cs *ConsensusState) enterPrecommit(height int, round int) { if cs.ProposalBlock.HashesTo(hash) { log.Info("enterPrecommit: +2/3 prevoted proposal block. Locking", "hash", hash) // Validate the block. - if err := cs.stageBlock(cs.ProposalBlock, cs.ProposalBlockParts); err != nil { + if err := cs.state.ValidateBlock(cs.ProposalBlock); err != nil { PanicConsensus(Fmt("enterPrecommit: +2/3 prevoted for an invalid block: %v", err)) } cs.LockedRound = round @@ -1120,27 +1115,64 @@ func (cs *ConsensusState) finalizeCommit(height int) { } hash, header, ok := cs.Votes.Precommits(cs.CommitRound).TwoThirdsMajority() + block, blockParts := cs.ProposalBlock, cs.ProposalBlockParts if !ok { PanicSanity(Fmt("Cannot finalizeCommit, commit does not have two thirds majority")) } - if !cs.ProposalBlockParts.HasHeader(header) { + if !blockParts.HasHeader(header) { PanicSanity(Fmt("Expected ProposalBlockParts header to be commit header")) } - if !cs.ProposalBlock.HashesTo(hash) { + if !block.HashesTo(hash) { PanicSanity(Fmt("Cannot finalizeCommit, ProposalBlock does not hash to commit hash")) } - if err := cs.stageBlock(cs.ProposalBlock, cs.ProposalBlockParts); err != nil { + if err := cs.state.ValidateBlock(block); err != nil { PanicConsensus(Fmt("+2/3 committed an invalid block: %v", err)) } - log.Notice("Finalizing commit of block", "height", cs.ProposalBlock.Height, "hash", cs.ProposalBlock.Hash()) - log.Info(Fmt("%v", cs.ProposalBlock)) - // We have the block, so stage/save/commit-vote. - cs.saveBlock(cs.ProposalBlock, cs.ProposalBlockParts, cs.Votes.Precommits(cs.CommitRound)) + log.Notice("Finalizing commit of block", "height", block.Height, "hash", block.Hash()) + log.Info(Fmt("%v", block)) + + // Fire off event for new block. + // TODO: Handle app failure. See #177 + cs.evsw.FireEvent(types.EventStringNewBlock(), types.EventDataNewBlock{block}) + + // Create a copy of the state for staging + stateCopy := cs.state.Copy() + + // Run the block on the State: + // + update validator sets + // + run txs on the proxyAppConn + err := stateCopy.ExecBlock(cs.evsw, cs.proxyAppConn, block, blockParts.Header()) + if err != nil { + // TODO: handle this gracefully. + PanicQ(Fmt("Exec failed for application")) + } + + // Save to blockStore. + if cs.blockStore.Height() < block.Height { + commits := cs.Votes.Precommits(cs.CommitRound) + seenValidation := commits.MakeValidation() + cs.blockStore.SaveBlock(block, blockParts, seenValidation) + } + + /* + // Commit to proxyAppConn + err = cs.proxyAppConn.CommitSync() + if err != nil { + // TODO: handle this gracefully. + PanicQ(Fmt("Commit failed for application")) + } + */ + + // Save the state. + stateCopy.Save() + + // Update mempool. + cs.mempool.Update(block.Height, block.Txs) // NewHeightStep! - cs.updateToState(cs.stagedState) + cs.updateToState(stateCopy) // cs.StartTime is already set. // Schedule Round0 to start soon. @@ -1352,39 +1384,6 @@ func (cs *ConsensusState) addVote(valIndex int, vote *types.Vote, peerKey string return } -func (cs *ConsensusState) stageBlock(block *types.Block, blockParts *types.PartSet) error { - if block == nil { - PanicSanity("Cannot stage nil block") - } - - // Already staged? - blockHash := block.Hash() - if cs.stagedBlock != nil && len(blockHash) != 0 && bytes.Equal(cs.stagedBlock.Hash(), blockHash) { - return nil - } - - // Create a new event cache to cache all events. - cs.evc = events.NewEventCache(cs.evsw) - - // Create a copy of the state for staging - stateCopy := cs.state.Copy() - stateCopy.SetEventCache(cs.evc) - - // Run the block on the State: - // + update validator sets - // + first rolls back proxyAppCtx - // + run txs on the proxyAppCtx or rollback - err := stateCopy.ExecBlock(cs.proxyAppCtx, block, blockParts.Header()) - if err != nil { - return err - } - - // Everything looks good! - cs.stagedBlock = block - cs.stagedState = stateCopy - return nil -} - func (cs *ConsensusState) signVote(type_ byte, hash []byte, header types.PartSetHeader) (*types.Vote, error) { vote := &types.Vote{ Height: cs.Height, @@ -1415,41 +1414,6 @@ func (cs *ConsensusState) signAddVote(type_ byte, hash []byte, header types.Part } } -// Save Block, save the +2/3 Commits we've seen -func (cs *ConsensusState) saveBlock(block *types.Block, blockParts *types.PartSet, commits *types.VoteSet) { - - // The proposal must be valid. - if err := cs.stageBlock(block, blockParts); err != nil { - PanicSanity(Fmt("saveBlock() an invalid block: %v", err)) - } - - // Save to blockStore. - if cs.blockStore.Height() < block.Height { - seenValidation := commits.MakeValidation() - cs.blockStore.SaveBlock(block, blockParts, seenValidation) - } - - // Commit to proxyAppCtx - err := cs.stagedState.Commit(cs.proxyAppCtx) - if err != nil { - // TODO: handle this gracefully. - PanicQ(Fmt("Commit failed for applicaiton")) - } - - // Save the state. - cs.stagedState.Save() - - // Update mempool. - cs.mempool.Update(block) - - // Fire off event - if cs.evsw != nil && cs.evc != nil { - cs.evsw.FireEvent(types.EventStringNewBlock(), types.EventDataNewBlock{block}) - go cs.evc.Flush() - } - -} - //--------------------------------------------------------- func CompareHRS(h1, r1 int, s1 RoundStepType, h2, r2 int, s2 RoundStepType) int { diff --git a/consensus/state_test.go b/consensus/state_test.go index d6e8682a46d..fbe6a3db534 100644 --- a/consensus/state_test.go +++ b/consensus/state_test.go @@ -209,7 +209,7 @@ func TestBadProposal(t *testing.T) { // wait for proposal <-proposalCh - //wait for prevote + // wait for prevote <-voteCh validatePrevote(t, cs1, round, vss[0], nil) diff --git a/mempool/mempool.go b/mempool/mempool.go index 1684a2bf733..f18c75f2182 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -1,7 +1,6 @@ package mempool import ( - "bytes" "container/list" "sync" "sync/atomic" @@ -15,7 +14,7 @@ import ( /* -The mempool pushes new txs onto the proxyAppCtx. +The mempool pushes new txs onto the proxyAppConn. It gets a stream of (req, res) tuples from the proxy. The memool stores good txs in a concurrent linked-list. @@ -24,14 +23,14 @@ safely by calling .NextWait() on each element. So we have several go-routines: 1. Consensus calling Update() and Reap() synchronously -2. Many mempool reactor's peer routines calling AppendTx() +2. Many mempool reactor's peer routines calling CheckTx() 3. Many mempool reactor's peer routines traversing the txs linked list 4. Another goroutine calling GarbageCollectTxs() periodically To manage these goroutines, there are three methods of locking. 1. Mutations to the linked-list is protected by an internal mtx (CList is goroutine-safe) 2. Mutations to the linked-list elements are atomic -3. AppendTx() calls can be paused upon Update() and Reap(), protected by .proxyMtx +3. CheckTx() calls can be paused upon Update() and Reap(), protected by .proxyMtx Garbage collection of old elements from mempool.txs is handlde via the DetachPrev() call, which makes old elements not reachable by @@ -42,12 +41,11 @@ peer broadcastTxRoutine() automatically garbage collected. const cacheSize = 100000 type Mempool struct { - proxyMtx sync.Mutex - proxyAppCtx proxy.AppContext - txs *clist.CList // concurrent linked-list of good txs - counter int64 // simple incrementing counter - height int // the last block Update()'d to - expected *clist.CElement // pointer to .txs for next response + proxyMtx sync.Mutex + proxyAppConn proxy.AppConn + txs *clist.CList // concurrent linked-list of good txs + counter int64 // simple incrementing counter + height int // the last block Update()'d to // Keep a cache of already-seen txs. // This reduces the pressure on the proxyApp. @@ -55,18 +53,17 @@ type Mempool struct { cacheList *list.List } -func NewMempool(proxyAppCtx proxy.AppContext) *Mempool { +func NewMempool(proxyAppConn proxy.AppConn) *Mempool { mempool := &Mempool{ - proxyAppCtx: proxyAppCtx, - txs: clist.New(), - counter: 0, - height: 0, - expected: nil, + proxyAppConn: proxyAppConn, + txs: clist.New(), + counter: 0, + height: 0, cacheMap: make(map[string]struct{}, cacheSize), cacheList: list.New(), } - proxyAppCtx.SetResponseCallback(mempool.resCb) + proxyAppConn.SetResponseCallback(mempool.resCb) return mempool } @@ -78,7 +75,7 @@ func (mem *Mempool) TxsFrontWait() *clist.CElement { // Try a new transaction in the mempool. // Potentially blocking if we're blocking on Update() or Reap(). -func (mem *Mempool) AppendTx(tx types.Tx) (err error) { +func (mem *Mempool) CheckTx(tx types.Tx) (err error) { mem.proxyMtx.Lock() defer mem.proxyMtx.Unlock() @@ -96,70 +93,43 @@ func (mem *Mempool) AppendTx(tx types.Tx) (err error) { mem.cacheList.PushBack(tx) // END CACHE - if err = mem.proxyAppCtx.Error(); err != nil { + if err = mem.proxyAppConn.Error(); err != nil { return err } - mem.proxyAppCtx.AppendTxAsync(tx) + mem.proxyAppConn.CheckTxAsync(tx) return nil } // TMSP callback function -// CONTRACT: No other goroutines mutate mem.expected concurrently. func (mem *Mempool) resCb(req tmsp.Request, res tmsp.Response) { switch res := res.(type) { - case tmsp.ResponseAppendTx: - reqAppendTx := req.(tmsp.RequestAppendTx) - if mem.expected == nil { // Normal operation - if res.RetCode == tmsp.RetCodeOK { - mem.counter++ - memTx := &mempoolTx{ - counter: mem.counter, - height: int64(mem.height), - tx: reqAppendTx.TxBytes, - } - mem.txs.PushBack(memTx) - } else { - // ignore bad transaction - // TODO: handle other retcodes + case tmsp.ResponseCheckTx: + reqCheckTx := req.(tmsp.RequestCheckTx) + if res.RetCode == tmsp.RetCodeOK { + mem.counter++ + memTx := &mempoolTx{ + counter: mem.counter, + height: int64(mem.height), + tx: reqCheckTx.TxBytes, } - } else { // During Update() - // TODO Log sane warning if mem.expected is nil. - memTx := mem.expected.Value.(*mempoolTx) - if !bytes.Equal(reqAppendTx.TxBytes, memTx.tx) { - PanicSanity("Unexpected tx response from proxy") - } - if res.RetCode == tmsp.RetCodeOK { - // Good, nothing to do. - } else { - // TODO: handle other retcodes - // Tx became invalidated due to newly committed block. - // NOTE: Concurrent traversal of mem.txs via CElement.Next() still works. - mem.txs.Remove(mem.expected) - mem.expected.DetachPrev() - } - mem.expected = mem.expected.Next() + mem.txs.PushBack(memTx) + } else { + // ignore bad transaction + // TODO: handle other retcodes } default: // ignore other messages } } -// Get the valid transactions run so far, and the hash of -// the application state that results from those transactions. -func (mem *Mempool) Reap() ([]types.Tx, []byte, error) { +// Get the valid transactions remaining +func (mem *Mempool) Reap() ([]types.Tx, error) { mem.proxyMtx.Lock() defer mem.proxyMtx.Unlock() - // First, get the hash of txs run so far - hash, err := mem.proxyAppCtx.GetHashSync() - if err != nil { - return nil, nil, err - } - - // And collect all the transactions. txs := mem.collectTxs() - return txs, hash, nil + return txs, nil } func (mem *Mempool) collectTxs() []types.Tx { @@ -171,54 +141,29 @@ func (mem *Mempool) collectTxs() []types.Tx { return txs } -// "block" is the new block that was committed. -// Txs that are present in "block" are discarded from mempool. +// Tell mempool that these txs were committed. +// Mempool will discard these txs. // NOTE: this should be called *after* block is committed by consensus. -// CONTRACT: block is valid and next in sequence. -func (mem *Mempool) Update(block *types.Block) error { +func (mem *Mempool) Update(height int, txs []types.Tx) error { mem.proxyMtx.Lock() defer mem.proxyMtx.Unlock() - // Rollback mempool synchronously - // TODO: test that proxyAppCtx's state matches the block's - err := mem.proxyAppCtx.RollbackSync() - if err != nil { - return err + // First, create a lookup map of txns in new txs. + txsMap := make(map[string]struct{}) + for _, tx := range txs { + txsMap[string(tx)] = struct{}{} } - // First, create a lookup map of txns in new block. - blockTxsMap := make(map[string]struct{}) - for _, tx := range block.Data.Txs { - blockTxsMap[string(tx)] = struct{}{} - } - - // Remove transactions that are already in block. - // Return the remaining potentially good txs. - goodTxs := mem.filterTxs(block.Height, blockTxsMap) - - // Set height and expected - mem.height = block.Height - mem.expected = mem.txs.Front() - - // Push good txs to proxyAppCtx - // NOTE: resCb() may be called concurrently. - for _, tx := range goodTxs { - mem.proxyAppCtx.AppendTxAsync(tx) - if err := mem.proxyAppCtx.Error(); err != nil { - return err - } - } + // Set height + mem.height = height - // NOTE: Even though we return immediately without e.g. - // calling mem.proxyAppCtx.FlushSync(), - // New mempool txs will still have to wait until - // all goodTxs are re-processed. - // So we could make synchronous calls here to proxyAppCtx. + // Remove transactions that are already in txs. + mem.filterTxs(txsMap) return nil } -func (mem *Mempool) filterTxs(height int, blockTxsMap map[string]struct{}) []types.Tx { +func (mem *Mempool) filterTxs(blockTxsMap map[string]struct{}) []types.Tx { goodTxs := make([]types.Tx, 0, mem.txs.Len()) for e := mem.txs.Front(); e != nil; e = e.Next() { memTx := e.Value.(*mempoolTx) @@ -229,7 +174,6 @@ func (mem *Mempool) filterTxs(height int, blockTxsMap map[string]struct{}) []typ continue } // Good tx! - atomic.StoreInt64(&memTx.height, int64(height)) goodTxs = append(goodTxs, memTx.tx) } return goodTxs diff --git a/mempool/mempool_test.go b/mempool/mempool_test.go index bdf326fa46f..ab0925efc42 100644 --- a/mempool/mempool_test.go +++ b/mempool/mempool_test.go @@ -2,6 +2,7 @@ package mempool import ( "encoding/binary" + "sync" "testing" "github.com/tendermint/tendermint/proxy" @@ -13,13 +14,11 @@ import ( func TestSerialReap(t *testing.T) { app := example.NewCounterApplication(true) - appCtxMempool := app.Open() - proxyAppCtx := proxy.NewLocalAppContext(appCtxMempool) - mempool := NewMempool(proxyAppCtx) - - // Create another AppContext for committing. - appCtxConsensus := app.Open() - appCtxConsensus.SetOption("serial", "on") + app.SetOption("serial", "on") + mtx := new(sync.Mutex) + appConnMem := proxy.NewLocalAppConn(mtx, app) + appConnCon := proxy.NewLocalAppConn(mtx, app) + mempool := NewMempool(appConnMem) appendTxsRange := func(start, end int) { // Append some txs. @@ -28,24 +27,24 @@ func TestSerialReap(t *testing.T) { // This will succeed txBytes := make([]byte, 32) binary.LittleEndian.PutUint64(txBytes, uint64(i)) - err := mempool.AppendTx(txBytes) + err := mempool.CheckTx(txBytes) if err != nil { - t.Fatal("Error after AppendTx: %v", err) + t.Fatal("Error after CheckTx: %v", err) } // This will fail because not serial (incrementing) // However, error should still be nil. // It just won't show up on Reap(). - err = mempool.AppendTx(txBytes) + err = mempool.CheckTx(txBytes) if err != nil { - t.Fatal("Error after AppendTx: %v", err) + t.Fatal("Error after CheckTx: %v", err) } } } reapCheck := func(exp int) { - txs, _, err := mempool.Reap() + txs, err := mempool.Reap() if err != nil { t.Error("Error in mempool.Reap()", err) } @@ -61,10 +60,7 @@ func TestSerialReap(t *testing.T) { binary.LittleEndian.PutUint64(txBytes, uint64(i)) txs = append(txs, txBytes) } - blockHeader := &types.Header{Height: 0} - blockData := &types.Data{Txs: txs} - block := &types.Block{Header: blockHeader, Data: blockData} - err := mempool.Update(block) + err := mempool.Update(0, txs) if err != nil { t.Error("Error in mempool.Update()", err) } @@ -75,12 +71,12 @@ func TestSerialReap(t *testing.T) { for i := start; i < end; i++ { txBytes := make([]byte, 32) binary.LittleEndian.PutUint64(txBytes, uint64(i)) - _, retCode := appCtxConsensus.AppendTx(txBytes) + _, retCode := appConnCon.AppendTx(txBytes) if retCode != tmsp.RetCodeOK { t.Error("Error committing tx", retCode) } } - retCode := appCtxConsensus.Commit() + _, retCode := appConnCon.GetHash() if retCode != tmsp.RetCodeOK { t.Error("Error committing range", retCode) } @@ -97,7 +93,7 @@ func TestSerialReap(t *testing.T) { // Reap again. We should get the same amount reapCheck(100) - // Append 0 to 999, we should reap 900 txs + // Append 0 to 999, we should reap 900 new txs // because 100 were already counted. appendTxsRange(0, 1000) @@ -107,11 +103,16 @@ func TestSerialReap(t *testing.T) { // Reap again. We should get the same amount reapCheck(1000) - // Commit from the conensus AppContext + // Commit from the conensus AppConn commitRange(0, 500) updateRange(0, 500) // We should have 500 left. reapCheck(500) + // Append 100 invalid txs and 100 valid txs + appendTxsRange(900, 1100) + + // We should have 600 now. + reapCheck(600) } diff --git a/mempool/reactor.go b/mempool/reactor.go index ec0a2fbc0c5..a9de9726620 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -67,7 +67,7 @@ func (memR *MempoolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) { switch msg := msg.(type) { case *TxMessage: - err := memR.Mempool.AppendTx(msg.Tx) + err := memR.Mempool.CheckTx(msg.Tx) if err != nil { // Bad, seen, or conflicting tx. log.Info("Could not add tx", "tx", msg.Tx) @@ -81,9 +81,9 @@ func (memR *MempoolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) { } } -// Just an alias for AppendTx since broadcasting happens in peer routines +// Just an alias for CheckTx since broadcasting happens in peer routines func (memR *MempoolReactor) BroadcastTx(tx types.Tx) error { - return memR.Mempool.AppendTx(tx) + return memR.Mempool.CheckTx(tx) } type PeerState interface { diff --git a/node/node.go b/node/node.go index 539e315dc65..9f64bfa8262 100644 --- a/node/node.go +++ b/node/node.go @@ -49,11 +49,11 @@ func NewNode() *Node { // Get State state := getState() - // Create two proxyAppCtx connections, + // Create two proxyAppConn connections, // one for the consensus and one for the mempool. proxyAddr := config.GetString("proxy_app") - proxyAppCtxMempool := getProxyApp(proxyAddr, state.LastAppHash) - proxyAppCtxConsensus := getProxyApp(proxyAddr, state.LastAppHash) + proxyAppConnMempool := getProxyApp(proxyAddr, state.AppHash) + proxyAppConnConsensus := getProxyApp(proxyAddr, state.AppHash) // add the chainid to the global config config.Set("chain_id", state.ChainID) @@ -73,14 +73,14 @@ func NewNode() *Node { } // Make BlockchainReactor - bcReactor := bc.NewBlockchainReactor(state.Copy(), proxyAppCtxConsensus, blockStore, config.GetBool("fast_sync")) + bcReactor := bc.NewBlockchainReactor(state.Copy(), proxyAppConnConsensus, blockStore, config.GetBool("fast_sync")) // Make MempoolReactor - mempool := mempl.NewMempool(proxyAppCtxMempool) + mempool := mempl.NewMempool(proxyAppConnMempool) mempoolReactor := mempl.NewMempoolReactor(mempool) // Make ConsensusReactor - consensusState := consensus.NewConsensusState(state.Copy(), proxyAppCtxConsensus, blockStore, mempool) + consensusState := consensus.NewConsensusState(state.Copy(), proxyAppConnConsensus, blockStore, mempool) consensusReactor := consensus.NewConsensusReactor(consensusState, blockStore, config.GetBool("fast_sync")) if privValidator != nil { consensusReactor.SetPrivValidator(privValidator) @@ -315,25 +315,25 @@ func getState() *sm.State { return state } -// Get a connection to the proxyAppCtx addr. +// Get a connection to the proxyAppConn addr. // Check the current hash, and panic if it doesn't match. -func getProxyApp(addr string, hash []byte) proxy.AppContext { +func getProxyApp(addr string, hash []byte) proxy.AppConn { proxyConn, err := Connect(addr) if err != nil { Exit(Fmt("Failed to connect to proxy for mempool: %v", err)) } - proxyAppCtx := proxy.NewRemoteAppContext(proxyConn, 1024) + proxyAppConn := proxy.NewRemoteAppConn(proxyConn, 1024) - proxyAppCtx.Start() + proxyAppConn.Start() // Check the hash - currentHash, err := proxyAppCtx.GetHashSync() + currentHash, err := proxyAppConn.GetHashSync() if err != nil { - PanicCrisis(Fmt("Error in getting proxyAppCtx hash: %v", err)) + PanicCrisis(Fmt("Error in getting proxyAppConn hash: %v", err)) } if !bytes.Equal(hash, currentHash) { PanicCrisis(Fmt("ProxyApp hash does not match. Expected %X, got %X", hash, currentHash)) } - return proxyAppCtx + return proxyAppConn } diff --git a/proxy/app_context.go b/proxy/app_conn.go similarity index 81% rename from proxy/app_context.go rename to proxy/app_conn.go index 85ad1367c30..3b98eb60806 100644 --- a/proxy/app_context.go +++ b/proxy/app_conn.go @@ -6,16 +6,15 @@ import ( type Callback func(tmsp.Request, tmsp.Response) -type AppContext interface { +type AppConn interface { SetResponseCallback(Callback) Error() error EchoAsync(msg string) FlushAsync() AppendTxAsync(tx []byte) + CheckTxAsync(tx []byte) GetHashAsync() - CommitAsync() - RollbackAsync() SetOptionAsync(key string, value string) AddListenerAsync(key string) RemListenerAsync(key string) @@ -23,6 +22,4 @@ type AppContext interface { InfoSync() (info []string, err error) FlushSync() error GetHashSync() (hash []byte, err error) - CommitSync() error - RollbackSync() error } diff --git a/proxy/local_app_conn.go b/proxy/local_app_conn.go new file mode 100644 index 00000000000..680bb764f28 --- /dev/null +++ b/proxy/local_app_conn.go @@ -0,0 +1,128 @@ +package proxy + +import ( + tmsp "github.com/tendermint/tmsp/types" + "sync" +) + +type localAppConn struct { + mtx *sync.Mutex + tmsp.Application + Callback +} + +func NewLocalAppConn(mtx *sync.Mutex, app tmsp.Application) *localAppConn { + return &localAppConn{ + mtx: mtx, + Application: app, + } +} + +func (app *localAppConn) SetResponseCallback(cb Callback) { + app.mtx.Lock() + defer app.mtx.Unlock() + app.Callback = cb +} + +// TODO: change tmsp.Application to include Error()? +func (app *localAppConn) Error() error { + return nil +} + +func (app *localAppConn) EchoAsync(msg string) { + app.mtx.Lock() + msg2 := app.Application.Echo(msg) + app.mtx.Unlock() + app.Callback( + tmsp.RequestEcho{msg}, + tmsp.ResponseEcho{msg2}, + ) +} + +func (app *localAppConn) FlushAsync() { + // Do nothing +} + +func (app *localAppConn) SetOptionAsync(key string, value string) { + app.mtx.Lock() + retCode := app.Application.SetOption(key, value) + app.mtx.Unlock() + app.Callback( + tmsp.RequestSetOption{key, value}, + tmsp.ResponseSetOption{retCode}, + ) +} + +func (app *localAppConn) AppendTxAsync(tx []byte) { + app.mtx.Lock() + events, retCode := app.Application.AppendTx(tx) + app.mtx.Unlock() + app.Callback( + tmsp.RequestAppendTx{tx}, + tmsp.ResponseAppendTx{retCode}, + ) + for _, event := range events { + app.Callback( + nil, + tmsp.ResponseEvent{event}, + ) + } +} + +func (app *localAppConn) CheckTxAsync(tx []byte) { + app.mtx.Lock() + retCode := app.Application.CheckTx(tx) + app.mtx.Unlock() + app.Callback( + tmsp.RequestCheckTx{tx}, + tmsp.ResponseCheckTx{retCode}, + ) +} + +func (app *localAppConn) GetHashAsync() { + app.mtx.Lock() + hash, retCode := app.Application.GetHash() + app.mtx.Unlock() + app.Callback( + tmsp.RequestGetHash{}, + tmsp.ResponseGetHash{retCode, hash}, + ) +} + +func (app *localAppConn) AddListenerAsync(key string) { + app.mtx.Lock() + retCode := app.Application.AddListener(key) + app.mtx.Unlock() + app.Callback( + tmsp.RequestAddListener{key}, + tmsp.ResponseAddListener{retCode}, + ) +} + +func (app *localAppConn) RemListenerAsync(key string) { + app.mtx.Lock() + retCode := app.Application.RemListener(key) + app.mtx.Unlock() + app.Callback( + tmsp.RequestRemListener{key}, + tmsp.ResponseRemListener{retCode}, + ) +} + +func (app *localAppConn) InfoSync() (info []string, err error) { + app.mtx.Lock() + info = app.Application.Info() + app.mtx.Unlock() + return info, nil +} + +func (app *localAppConn) FlushSync() error { + return nil +} + +func (app *localAppConn) GetHashSync() (hash []byte, err error) { + app.mtx.Lock() + hash, retCode := app.Application.GetHash() + app.mtx.Unlock() + return hash, retCode.Error() +} diff --git a/proxy/local_app_context.go b/proxy/local_app_context.go deleted file mode 100644 index b6d1b26782d..00000000000 --- a/proxy/local_app_context.go +++ /dev/null @@ -1,123 +0,0 @@ -package proxy - -import ( - tmsp "github.com/tendermint/tmsp/types" -) - -type localAppContext struct { - tmsp.AppContext - Callback -} - -func NewLocalAppContext(app tmsp.AppContext) *localAppContext { - return &localAppContext{ - AppContext: app, - } -} - -func (app *localAppContext) SetResponseCallback(cb Callback) { - app.Callback = cb -} - -// TODO: change tmsp.AppContext to include Error()? -func (app *localAppContext) Error() error { - return nil -} - -func (app *localAppContext) EchoAsync(msg string) { - msg2 := app.AppContext.Echo(msg) - app.Callback( - tmsp.RequestEcho{msg}, - tmsp.ResponseEcho{msg2}, - ) -} - -func (app *localAppContext) FlushAsync() { - // Do nothing -} - -func (app *localAppContext) SetOptionAsync(key string, value string) { - retCode := app.AppContext.SetOption(key, value) - app.Callback( - tmsp.RequestSetOption{key, value}, - tmsp.ResponseSetOption{retCode}, - ) -} - -func (app *localAppContext) AppendTxAsync(tx []byte) { - events, retCode := app.AppContext.AppendTx(tx) - app.Callback( - tmsp.RequestAppendTx{tx}, - tmsp.ResponseAppendTx{retCode}, - ) - for _, event := range events { - app.Callback( - nil, - tmsp.ResponseEvent{event}, - ) - } -} - -func (app *localAppContext) GetHashAsync() { - hash, retCode := app.AppContext.GetHash() - app.Callback( - tmsp.RequestGetHash{}, - tmsp.ResponseGetHash{retCode, hash}, - ) -} - -func (app *localAppContext) CommitAsync() { - retCode := app.AppContext.Commit() - app.Callback( - tmsp.RequestCommit{}, - tmsp.ResponseCommit{retCode}, - ) -} - -func (app *localAppContext) RollbackAsync() { - retCode := app.AppContext.Rollback() - app.Callback( - tmsp.RequestRollback{}, - tmsp.ResponseRollback{retCode}, - ) -} - -func (app *localAppContext) AddListenerAsync(key string) { - retCode := app.AppContext.AddListener(key) - app.Callback( - tmsp.RequestAddListener{key}, - tmsp.ResponseAddListener{retCode}, - ) -} - -func (app *localAppContext) RemListenerAsync(key string) { - retCode := app.AppContext.RemListener(key) - app.Callback( - tmsp.RequestRemListener{key}, - tmsp.ResponseRemListener{retCode}, - ) -} - -func (app *localAppContext) InfoSync() (info []string, err error) { - info = app.AppContext.Info() - return info, nil -} - -func (app *localAppContext) FlushSync() error { - return nil -} - -func (app *localAppContext) GetHashSync() (hash []byte, err error) { - hash, retCode := app.AppContext.GetHash() - return hash, retCode.Error() -} - -func (app *localAppContext) CommitSync() (err error) { - retCode := app.AppContext.Commit() - return retCode.Error() -} - -func (app *localAppContext) RollbackSync() (err error) { - retCode := app.AppContext.Rollback() - return retCode.Error() -} diff --git a/proxy/remote_app_context.go b/proxy/remote_app_conn.go similarity index 70% rename from proxy/remote_app_context.go rename to proxy/remote_app_conn.go index 9bd0325c001..a1dff9a8409 100644 --- a/proxy/remote_app_context.go +++ b/proxy/remote_app_conn.go @@ -15,15 +15,17 @@ import ( ) const maxResponseSize = 1048576 // 1MB +const flushThrottleMS = 20 // Don't wait longer than... // This is goroutine-safe, but users should beware that // the application in general is not meant to be interfaced // with concurrent callers. -type remoteAppContext struct { +type remoteAppConn struct { QuitService sync.Mutex // [EB]: is this even used? - reqQueue chan *reqRes + reqQueue chan *reqRes + flushTimer *ThrottleTimer mtx sync.Mutex conn net.Conn @@ -33,39 +35,42 @@ type remoteAppContext struct { resCb func(tmsp.Request, tmsp.Response) } -func NewRemoteAppContext(conn net.Conn, bufferSize int) *remoteAppContext { - app := &remoteAppContext{ - reqQueue: make(chan *reqRes, bufferSize), +func NewRemoteAppConn(conn net.Conn, bufferSize int) *remoteAppConn { + app := &remoteAppConn{ + reqQueue: make(chan *reqRes, bufferSize), + flushTimer: NewThrottleTimer("remoteAppConn", flushThrottleMS), + conn: conn, bufWriter: bufio.NewWriter(conn), reqSent: list.New(), resCb: nil, } - app.QuitService = *NewQuitService(nil, "remoteAppContext", app) + app.QuitService = *NewQuitService(nil, "remoteAppConn", app) return app } -func (app *remoteAppContext) OnStart() error { +func (app *remoteAppConn) OnStart() error { app.QuitService.OnStart() go app.sendRequestsRoutine() go app.recvResponseRoutine() return nil } -func (app *remoteAppContext) OnStop() { +func (app *remoteAppConn) OnStop() { app.QuitService.OnStop() app.conn.Close() } -func (app *remoteAppContext) SetResponseCallback(resCb Callback) { +// NOTE: callback may get internally generated flush responses. +func (app *remoteAppConn) SetResponseCallback(resCb Callback) { app.mtx.Lock() defer app.mtx.Unlock() app.resCb = resCb } -func (app *remoteAppContext) StopForError(err error) { +func (app *remoteAppConn) StopForError(err error) { app.mtx.Lock() - log.Error("Stopping remoteAppContext for error.", "error", err) + log.Error("Stopping remoteAppConn for error.", "error", err) if app.err == nil { app.err = err } @@ -73,7 +78,7 @@ func (app *remoteAppContext) StopForError(err error) { app.Stop() } -func (app *remoteAppContext) Error() error { +func (app *remoteAppConn) Error() error { app.mtx.Lock() defer app.mtx.Unlock() return app.err @@ -81,17 +86,21 @@ func (app *remoteAppContext) Error() error { //---------------------------------------- -func (app *remoteAppContext) sendRequestsRoutine() { +func (app *remoteAppConn) sendRequestsRoutine() { for { var n int var err error select { + case <-app.flushTimer.Ch: + select { + case app.reqQueue <- newReqRes(tmsp.RequestFlush{}): + default: + // Probably will fill the buffer, or retry later. + } case <-app.QuitService.Quit: return case reqres := <-app.reqQueue: - app.willSendReq(reqres) - wire.WriteBinaryLengthPrefixed(struct{ tmsp.Request }{reqres.Request}, app.bufWriter, &n, &err) // Length prefix if err != nil { app.StopForError(err) @@ -109,7 +118,7 @@ func (app *remoteAppContext) sendRequestsRoutine() { } } -func (app *remoteAppContext) recvResponseRoutine() { +func (app *remoteAppConn) recvResponseRoutine() { r := bufio.NewReader(app.conn) // Buffer reads for { var res tmsp.Response @@ -133,13 +142,13 @@ func (app *remoteAppContext) recvResponseRoutine() { } } -func (app *remoteAppContext) willSendReq(reqres *reqRes) { +func (app *remoteAppConn) willSendReq(reqres *reqRes) { app.mtx.Lock() defer app.mtx.Unlock() app.reqSent.PushBack(reqres) } -func (app *remoteAppContext) didRecvResponse(res tmsp.Response) error { +func (app *remoteAppConn) didRecvResponse(res tmsp.Response) error { app.mtx.Lock() defer app.mtx.Unlock() @@ -174,45 +183,41 @@ func (app *remoteAppContext) didRecvResponse(res tmsp.Response) error { //---------------------------------------- -func (app *remoteAppContext) EchoAsync(msg string) { +func (app *remoteAppConn) EchoAsync(msg string) { app.queueRequest(tmsp.RequestEcho{msg}) } -func (app *remoteAppContext) FlushAsync() { +func (app *remoteAppConn) FlushAsync() { app.queueRequest(tmsp.RequestFlush{}) } -func (app *remoteAppContext) SetOptionAsync(key string, value string) { +func (app *remoteAppConn) SetOptionAsync(key string, value string) { app.queueRequest(tmsp.RequestSetOption{key, value}) } -func (app *remoteAppContext) AppendTxAsync(tx []byte) { +func (app *remoteAppConn) AppendTxAsync(tx []byte) { app.queueRequest(tmsp.RequestAppendTx{tx}) } -func (app *remoteAppContext) GetHashAsync() { - app.queueRequest(tmsp.RequestGetHash{}) +func (app *remoteAppConn) CheckTxAsync(tx []byte) { + app.queueRequest(tmsp.RequestCheckTx{tx}) } -func (app *remoteAppContext) CommitAsync() { - app.queueRequest(tmsp.RequestCommit{}) -} - -func (app *remoteAppContext) RollbackAsync() { - app.queueRequest(tmsp.RequestRollback{}) +func (app *remoteAppConn) GetHashAsync() { + app.queueRequest(tmsp.RequestGetHash{}) } -func (app *remoteAppContext) AddListenerAsync(key string) { +func (app *remoteAppConn) AddListenerAsync(key string) { app.queueRequest(tmsp.RequestAddListener{key}) } -func (app *remoteAppContext) RemListenerAsync(key string) { +func (app *remoteAppConn) RemListenerAsync(key string) { app.queueRequest(tmsp.RequestRemListener{key}) } //---------------------------------------- -func (app *remoteAppContext) InfoSync() (info []string, err error) { +func (app *remoteAppConn) InfoSync() (info []string, err error) { reqres := app.queueRequest(tmsp.RequestInfo{}) app.FlushSync() if app.err != nil { @@ -221,12 +226,12 @@ func (app *remoteAppContext) InfoSync() (info []string, err error) { return reqres.Response.(tmsp.ResponseInfo).Data, nil } -func (app *remoteAppContext) FlushSync() error { +func (app *remoteAppConn) FlushSync() error { app.queueRequest(tmsp.RequestFlush{}).Wait() return app.err } -func (app *remoteAppContext) GetHashSync() (hash []byte, err error) { +func (app *remoteAppConn) GetHashSync() (hash []byte, err error) { reqres := app.queueRequest(tmsp.RequestGetHash{}) app.FlushSync() if app.err != nil { @@ -235,27 +240,21 @@ func (app *remoteAppContext) GetHashSync() (hash []byte, err error) { return reqres.Response.(tmsp.ResponseGetHash).Hash, nil } -// Commits or error -func (app *remoteAppContext) CommitSync() (err error) { - app.queueRequest(tmsp.RequestCommit{}) - app.FlushSync() - return app.err -} - -// Rollback or error -// Clears internal buffers -func (app *remoteAppContext) RollbackSync() (err error) { - app.queueRequest(tmsp.RequestRollback{}) - app.FlushSync() - return app.err -} - //---------------------------------------- -func (app *remoteAppContext) queueRequest(req tmsp.Request) *reqRes { - reqres := NewreqRes(req) +func (app *remoteAppConn) queueRequest(req tmsp.Request) *reqRes { + reqres := newReqRes(req) // TODO: set app.err if reqQueue times out app.reqQueue <- reqres + + // Maybe auto-flush, or unset auto-flush + switch req.(type) { + case tmsp.RequestFlush: + app.flushTimer.Unset() + default: + app.flushTimer.Set() + } + return reqres } @@ -273,12 +272,10 @@ func resMatchesReq(req tmsp.Request, res tmsp.Response) (ok bool) { _, ok = res.(tmsp.ResponseSetOption) case tmsp.RequestAppendTx: _, ok = res.(tmsp.ResponseAppendTx) + case tmsp.RequestCheckTx: + _, ok = res.(tmsp.ResponseCheckTx) case tmsp.RequestGetHash: _, ok = res.(tmsp.ResponseGetHash) - case tmsp.RequestCommit: - _, ok = res.(tmsp.ResponseCommit) - case tmsp.RequestRollback: - _, ok = res.(tmsp.ResponseRollback) case tmsp.RequestAddListener: _, ok = res.(tmsp.ResponseAddListener) case tmsp.RequestRemListener: @@ -295,7 +292,7 @@ type reqRes struct { tmsp.Response // Not set atomically, so be sure to use WaitGroup. } -func NewreqRes(req tmsp.Request) *reqRes { +func newReqRes(req tmsp.Request) *reqRes { return &reqRes{ Request: req, WaitGroup: waitGroup1(), diff --git a/proxy/remote_app_context_test.go b/proxy/remote_app_conn_test.go similarity index 94% rename from proxy/remote_app_context_test.go rename to proxy/remote_app_conn_test.go index 1242b6c8311..6b58c986ae7 100644 --- a/proxy/remote_app_context_test.go +++ b/proxy/remote_app_conn_test.go @@ -26,7 +26,7 @@ func TestEcho(t *testing.T) { logBuffer := bytes.NewBuffer(nil) logConn := logio.NewLoggedConn(conn, logBuffer) - proxy := NewRemoteAppContext(logConn, 10) + proxy := NewRemoteAppConn(logConn, 10) proxy.SetResponseCallback(nil) proxy.Start() @@ -56,7 +56,7 @@ func BenchmarkEcho(b *testing.B) { b.Log("Connected") } - proxy := NewRemoteAppContext(conn, 10) + proxy := NewRemoteAppConn(conn, 10) proxy.Start() echoString := strings.Repeat(" ", 200) b.StartTimer() // Start benchmarking tests @@ -86,7 +86,7 @@ func TestInfo(t *testing.T) { logBuffer := bytes.NewBuffer(nil) logConn := logio.NewLoggedConn(conn, logBuffer) - proxy := NewRemoteAppContext(logConn, 10) + proxy := NewRemoteAppConn(logConn, 10) proxy.Start() data, err := proxy.InfoSync() if err != nil { diff --git a/rpc/core/mempool.go b/rpc/core/mempool.go index fa294f71904..8df0cc01393 100644 --- a/rpc/core/mempool.go +++ b/rpc/core/mempool.go @@ -18,6 +18,6 @@ func BroadcastTx(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { } func ListUnconfirmedTxs() (*ctypes.ResultListUnconfirmedTxs, error) { - txs, _, err := mempoolReactor.Mempool.Reap() + txs, err := mempoolReactor.Mempool.Reap() return &ctypes.ResultListUnconfirmedTxs{len(txs), txs}, err } diff --git a/state/execution.go b/state/execution.go index 2b4d57e6ade..1578af4b3ec 100644 --- a/state/execution.go +++ b/state/execution.go @@ -1,21 +1,24 @@ package state import ( - "bytes" "errors" "fmt" . "github.com/tendermint/go-common" + "github.com/tendermint/tendermint/events" "github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/types" tmsp "github.com/tendermint/tmsp/types" ) +// Validate block +func (s *State) ValidateBlock(block *types.Block) error { + return s.validateBlock(block) +} + // Execute the block to mutate State. -// Also, execute txs on the proxyAppCtx and validate apphash -// Rolls back before executing transactions. -// Rolls back if invalid, but never commits. -func (s *State) ExecBlock(proxyAppCtx proxy.AppContext, block *types.Block, blockPartsHeader types.PartSetHeader) error { +// Validates block and then executes Data.Txs in the block. +func (s *State) ExecBlock(evsw *events.EventSwitch, proxyAppConn proxy.AppConn, block *types.Block, blockPartsHeader types.PartSetHeader) error { // Validate the block. err := s.validateBlock(block) @@ -30,81 +33,74 @@ func (s *State) ExecBlock(proxyAppCtx proxy.AppContext, block *types.Block, bloc // TODO: Update the validator set (e.g. block.Data.ValidatorUpdates?) nextValSet := valSet.Copy() - // First, rollback. - proxyAppCtx.RollbackSync() - - // Execute, or rollback. (Does not commit) - err = s.execBlockOnProxyApp(proxyAppCtx, block) + // Execute the block txs + err = s.execBlockOnProxyApp(evsw, proxyAppConn, block) if err != nil { - proxyAppCtx.RollbackSync() + // There was some error in proxyApp + // TODO Report error and wait for proxyApp to be available. return err } // All good! nextValSet.IncrementAccum(1) - s.Validators = nextValSet - s.LastValidators = valSet - s.LastAppHash = block.AppHash s.LastBlockHeight = block.Height s.LastBlockHash = block.Hash() s.LastBlockParts = blockPartsHeader s.LastBlockTime = block.Time + s.Validators = nextValSet + s.LastValidators = valSet return nil } -// Commits block on proxyAppCtx. -func (s *State) Commit(proxyAppCtx proxy.AppContext) error { - err := proxyAppCtx.CommitSync() - return err -} +// Executes block's transactions on proxyAppConn. +// TODO: Generate a bitmap or otherwise store tx validity in state. +func (s *State) execBlockOnProxyApp(evsw *events.EventSwitch, proxyAppConn proxy.AppConn, block *types.Block) error { + + var validTxs, invalidTxs = 0, 0 -// Executes transactions on proxyAppCtx. -func (s *State) execBlockOnProxyApp(proxyAppCtx proxy.AppContext, block *types.Block) error { // Execute transactions and get hash - var invalidTxErr error proxyCb := func(req tmsp.Request, res tmsp.Response) { switch res := res.(type) { case tmsp.ResponseAppendTx: - reqAppendTx := req.(tmsp.RequestAppendTx) - if res.RetCode != tmsp.RetCodeOK { - if invalidTxErr == nil { - invalidTxErr = InvalidTxError{reqAppendTx.TxBytes, res.RetCode} - } + // TODO: make use of this info + // Blocks may include invalid txs. + // reqAppendTx := req.(tmsp.RequestAppendTx) + if res.RetCode == tmsp.RetCodeOK { + validTxs += 1 + } else { + invalidTxs += 1 } case tmsp.ResponseEvent: - s.evc.FireEvent(types.EventStringApp(), types.EventDataApp{res.Key, res.Data}) + // TODO: some events should get stored in the blockchain. + evsw.FireEvent(types.EventStringApp(), types.EventDataApp{res.Key, res.Data}) } } - proxyAppCtx.SetResponseCallback(proxyCb) - for _, tx := range block.Data.Txs { - proxyAppCtx.AppendTxAsync(tx) - if err := proxyAppCtx.Error(); err != nil { + proxyAppConn.SetResponseCallback(proxyCb) + + // Run next txs in the block and get new AppHash + for _, tx := range block.Txs { + proxyAppConn.AppendTxAsync(tx) + if err := proxyAppConn.Error(); err != nil { return err } } - hash, err := proxyAppCtx.GetHashSync() + hash, err := proxyAppConn.GetHashSync() if err != nil { - log.Warn("Error computing proxyAppCtx hash", "error", err) + log.Warn("Error computing proxyAppConn hash", "error", err) return err } - if invalidTxErr != nil { - log.Warn("Invalid transaction in block") - return invalidTxErr - } + log.Info("ExecBlock got %v valid txs and %v invalid txs", validTxs, invalidTxs) - // Check that appHash matches - if !bytes.Equal(block.AppHash, hash) { - log.Warn(Fmt("App hash in proposal was %X, computed %X instead", block.AppHash, hash)) - return InvalidAppHashError{block.AppHash, hash} - } + // Set the state's new AppHash + s.AppHash = hash return nil } func (s *State) validateBlock(block *types.Block) error { // Basic block validation. - err := block.ValidateBasic(s.ChainID, s.LastBlockHeight, s.LastBlockHash, s.LastBlockParts, s.LastBlockTime) + err := block.ValidateBasic(s.ChainID, s.LastBlockHeight, s.LastBlockHash, s.LastBlockParts, s.LastBlockTime, s.AppHash) if err != nil { return err } @@ -130,7 +126,7 @@ func (s *State) validateBlock(block *types.Block) error { } // Updates the LastCommitHeight of the validators in valSet, in place. -// Assumes that lastValSet matches the valset of block.LastValidators +// Assumes that lastValSet matches the valset of block.LastValidation // CONTRACT: lastValSet is not mutated. func updateValidatorsWithBlock(lastValSet *types.ValidatorSet, valSet *types.ValidatorSet, block *types.Block) { @@ -167,12 +163,3 @@ type InvalidTxError struct { func (txErr InvalidTxError) Error() string { return Fmt("Invalid tx: [%v] code: [%v]", txErr.Tx, txErr.RetCode) } - -type InvalidAppHashError struct { - Expected []byte - Got []byte -} - -func (hashErr InvalidAppHashError) Error() string { - return Fmt("Invalid hash: [%X] got: [%X]", hashErr.Expected, hashErr.Got) -} diff --git a/state/state.go b/state/state.go index 9523a64975a..798e8ce7235 100644 --- a/state/state.go +++ b/state/state.go @@ -9,7 +9,6 @@ import ( . "github.com/tendermint/go-common" dbm "github.com/tendermint/go-db" "github.com/tendermint/go-wire" - "github.com/tendermint/tendermint/events" "github.com/tendermint/tendermint/types" ) @@ -25,15 +24,13 @@ type State struct { db dbm.DB GenesisDoc *types.GenesisDoc ChainID string - LastBlockHeight int + LastBlockHeight int // Genesis state has this set to 0. So, Block(H=0) does not exist. LastBlockHash []byte LastBlockParts types.PartSetHeader LastBlockTime time.Time Validators *types.ValidatorSet LastValidators *types.ValidatorSet - LastAppHash []byte - - evc *events.EventCache + AppHash []byte } func LoadState(db dbm.DB) *State { @@ -64,8 +61,7 @@ func (s *State) Copy() *State { LastBlockTime: s.LastBlockTime, Validators: s.Validators.Copy(), LastValidators: s.LastValidators.Copy(), - LastAppHash: s.LastAppHash, - evc: nil, + AppHash: s.AppHash, } } @@ -81,13 +77,6 @@ func (s *State) Save() { s.db.Set(stateKey, buf.Bytes()) } -func (s *State) SetEventCache(evc *events.EventCache) { - s.mtx.Lock() - defer s.mtx.Unlock() - - s.evc = evc -} - //----------------------------------------------------------------------------- // Genesis @@ -133,6 +122,6 @@ func MakeGenesisState(db dbm.DB, genDoc *types.GenesisDoc) *State { LastBlockTime: genDoc.GenesisTime, Validators: types.NewValidatorSet(validators), LastValidators: types.NewValidatorSet(nil), - LastAppHash: genDoc.AppHash, + AppHash: genDoc.AppHash, } } diff --git a/types/block.go b/types/block.go index c3e2320300a..db88ceb5663 100644 --- a/types/block.go +++ b/types/block.go @@ -22,7 +22,7 @@ type Block struct { // Basic validation that doesn't involve state data. func (b *Block) ValidateBasic(chainID string, lastBlockHeight int, lastBlockHash []byte, - lastBlockParts PartSetHeader, lastBlockTime time.Time) error { + lastBlockParts PartSetHeader, lastBlockTime time.Time, appHash []byte) error { if b.ChainID != chainID { return errors.New(Fmt("Wrong Block.Header.ChainID. Expected %v, got %v", chainID, b.ChainID)) } @@ -57,6 +57,9 @@ func (b *Block) ValidateBasic(chainID string, lastBlockHeight int, lastBlockHash if !bytes.Equal(b.DataHash, b.Data.Hash()) { return errors.New(Fmt("Wrong Block.Header.DataHash. Expected %X, got %X", b.DataHash, b.Data.Hash())) } + if !bytes.Equal(b.AppHash, appHash) { + return errors.New(Fmt("Wrong Block.Header.AppHash. Expected %X, got %X", appHash, b.AppHash)) + } // NOTE: the AppHash and ValidatorsHash are validated later. return nil } @@ -137,7 +140,7 @@ type Header struct { LastValidationHash []byte `json:"last_validation_hash"` DataHash []byte `json:"data_hash"` ValidatorsHash []byte `json:"validators_hash"` - AppHash []byte `json:"app_hash"` + AppHash []byte `json:"app_hash"` // state merkle root of txs from the previous block } // NOTE: hash is nil if required fields are missing. @@ -169,7 +172,7 @@ func (h *Header) StringIndented(indent string) string { %s Height: %v %s Time: %v %s Fees: %v -%s NumTxs: %v +%s NumTxs: %v %s LastBlock: %X %s LastBlockParts: %v %s LastValidation: %X @@ -326,6 +329,10 @@ func (v *Validation) StringIndented(indent string) string { //----------------------------------------------------------------------------- type Data struct { + + // Txs that will be applied by state @ block.Height+1. + // NOTE: not all txs here are valid. We're just agreeing on the order first. + // This means that block.AppHash does not include these txs. Txs []Tx `json:"txs"` // Volatile