Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Websockets #52

Merged
merged 12 commits into from
Apr 17, 2015
Prev Previous commit
Next Next commit
event cache and fireable interace
  • Loading branch information
ebuchman committed Apr 16, 2015
commit d27e0bbad578945bd8ece6f0386df8c9e98b2f57
4 changes: 2 additions & 2 deletions blockchain/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type BlockchainReactor struct {
quit chan struct{}
running uint32

evsw *events.EventSwitch
evsw events.Fireable
}

func NewBlockchainReactor(state *sm.State, store *BlockStore, sync bool) *BlockchainReactor {
Expand Down Expand Up @@ -242,7 +242,7 @@ func (bcR *BlockchainReactor) BroadcastStatus() error {
}

// implements events.Eventable
func (bcR *BlockchainReactor) SetEventSwitch(evsw *events.EventSwitch) {
func (bcR *BlockchainReactor) SetFireable(evsw events.Fireable) {
bcR.evsw = evsw
}

Expand Down
6 changes: 3 additions & 3 deletions consensus/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type ConsensusReactor struct {
blockStore *bc.BlockStore
conS *ConsensusState

evsw *events.EventSwitch
evsw events.Fireable
}

func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockStore) *ConsensusReactor {
Expand Down Expand Up @@ -234,9 +234,9 @@ func (conR *ConsensusReactor) ResetToState(state *sm.State) {
}

// implements events.Eventable
func (conR *ConsensusReactor) SetEventSwitch(evsw *events.EventSwitch) {
func (conR *ConsensusReactor) SetFireable(evsw events.Fireable) {
conR.evsw = evsw
conR.conS.SetEventSwitch(evsw)
conR.conS.SetFireable(evsw)
}

//--------------------------------------
Expand Down
18 changes: 12 additions & 6 deletions consensus/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,8 @@ type ConsensusState struct {
stagedState *sm.State // Cache result of staged block.
lastCommitVoteHeight uint // Last called commitVoteBlock() or saveCommitVoteBlock() on.

evsw *events.EventSwitch
evsw events.Fireable
evc *events.EventCache // set in stageBlock and passed into state
}

func NewConsensusState(state *sm.State, blockStore *bc.BlockStore, mempoolReactor *mempl.MempoolReactor) *ConsensusState {
Expand Down Expand Up @@ -443,9 +444,12 @@ ACTION_LOOP:
if cs.TryFinalizeCommit(rs.Height) {
// Now at new height
// cs.Step is at RoundStepNewHeight or RoundStepNewRound.
newBlock := cs.blockStore.LoadBlock(cs.state.LastBlockHeight)
cs.evsw.FireEvent(types.EventStringNewBlock(), newBlock)
// TODO: go fire events from event cache
// fire some events!
go func() {
newBlock := cs.blockStore.LoadBlock(cs.state.LastBlockHeight)
cs.evsw.FireEvent(types.EventStringNewBlock(), newBlock)
cs.evc.Flush()
}()
scheduleNextAction()
continue ACTION_LOOP
} else {
Expand Down Expand Up @@ -1032,6 +1036,9 @@ func (cs *ConsensusState) stageBlock(block *types.Block, blockParts *types.PartS

// Create a copy of the state for staging
stateCopy := cs.state.Copy()
// reset the event cache and pass it into the state
cs.evc = events.NewEventCache(cs.evsw)
stateCopy.SetFireable(cs.evc)

// Commit block onto the copied state.
// NOTE: Basic validation is done in state.AppendBlock().
Expand Down Expand Up @@ -1117,9 +1124,8 @@ func (cs *ConsensusState) saveCommitVoteBlock(block *types.Block, blockParts *ty
}

// implements events.Eventable
func (cs *ConsensusState) SetEventSwitch(evsw *events.EventSwitch) {
func (cs *ConsensusState) SetFireable(evsw events.Fireable) {
cs.evsw = evsw
cs.state.SetEventSwitch(evsw)
}

//-----------------------------------------------------------------------------
Expand Down
39 changes: 39 additions & 0 deletions events/event_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package events

const (
eventsBufferSize = 1000
)

// An EventCache buffers events for a Fireable
// All events are cached. Filtering happens on Flush
type EventCache struct {
evsw Fireable
events []eventInfo
}

// Create a new EventCache with an EventSwitch as backend
func NewEventCache(evsw Fireable) *EventCache {
return &EventCache{
evsw: evsw,
events: make([]eventInfo, eventsBufferSize),
}
}

// a cached event
type eventInfo struct {
event string
msg interface{}
}

// Cache an event to be fired upon finality.
func (evc *EventCache) FireEvent(event string, msg interface{}) {
// append to list
evc.events = append(evc.events, eventInfo{event, msg})
}

// Fire events by running evsw.FireEvent on all cached events. Blocks.
func (evc *EventCache) Flush() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clear out the evc.events

for _, ei := range evc.events {
evc.evsw.FireEvent(ei.event, ei.msg)
}
}
7 changes: 6 additions & 1 deletion events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@ import (
// reactors and other modules should export
// this interface to become eventable
type Eventable interface {
SetEventSwitch(*EventSwitch)
SetFireable(Fireable)
}

// an event switch or cache implements fireable
type Fireable interface {
FireEvent(event string, msg interface{})
}

type EventSwitch struct {
Expand Down
4 changes: 2 additions & 2 deletions mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (mem *Mempool) GetCache() *sm.BlockCache {
func (mem *Mempool) AddTx(tx types.Tx) (err error) {
mem.mtx.Lock()
defer mem.mtx.Unlock()
err = sm.ExecTx(mem.cache, tx, false, false)
err = sm.ExecTx(mem.cache, tx, false, nil)
if err != nil {
log.Debug("AddTx() error", "tx", tx, "error", err)
return err
Expand Down Expand Up @@ -93,7 +93,7 @@ func (mem *Mempool) ResetForBlockAndState(block *types.Block, state *sm.State) {
// Next, filter all txs that aren't valid given new state.
validTxs := []types.Tx{}
for _, tx := range txs {
err := sm.ExecTx(mem.cache, tx, false, false)
err := sm.ExecTx(mem.cache, tx, false, nil)
if err == nil {
log.Debug("Filter in, valid", "tx", tx)
validTxs = append(validTxs, tx)
Expand Down
4 changes: 2 additions & 2 deletions mempool/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type MempoolReactor struct {

Mempool *Mempool

evsw *events.EventSwitch
evsw events.Fireable
}

func NewMempoolReactor(mempool *Mempool) *MempoolReactor {
Expand Down Expand Up @@ -114,7 +114,7 @@ func (memR *MempoolReactor) BroadcastTx(tx types.Tx) error {
}

// implements events.Eventable
func (memR *MempoolReactor) SetEventSwitch(evsw *events.EventSwitch) {
func (memR *MempoolReactor) SetFireable(evsw events.Fireable) {
memR.evsw = evsw
}

Expand Down
6 changes: 3 additions & 3 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func NewNode() *Node {

// add the event switch to all services
// they should all satisfy events.Eventable
SetEventSwitch(eventSwitch, pexReactor, bcReactor, mempoolReactor, consensusReactor)
SetFireable(eventSwitch, pexReactor, bcReactor, mempoolReactor, consensusReactor)

return &Node{
sw: sw,
Expand Down Expand Up @@ -115,9 +115,9 @@ func (n *Node) Stop() {
}

// Add the event switch to reactors, mempool, etc.
func SetEventSwitch(evsw *events.EventSwitch, eventables ...events.Eventable) {
func SetFireable(evsw *events.EventSwitch, eventables ...events.Eventable) {
for _, e := range eventables {
e.SetEventSwitch(evsw)
e.SetFireable(evsw)
}
}

Expand Down
4 changes: 2 additions & 2 deletions p2p/pex_reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type PEXReactor struct {

book *AddrBook

evsw *events.EventSwitch
evsw events.Fireable
}

func NewPEXReactor(book *AddrBook) *PEXReactor {
Expand Down Expand Up @@ -211,7 +211,7 @@ func (pexR *PEXReactor) ensurePeers() {
}

// implements events.Eventable
func (pexR *PEXReactor) SetEventSwitch(evsw *events.EventSwitch) {
func (pexR *PEXReactor) SetFireable(evsw events.Fireable) {
pexR.evsw = evsw
}

Expand Down
50 changes: 23 additions & 27 deletions state/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ import (

"github.com/tendermint/tendermint/account"
. "github.com/tendermint/tendermint/common"
"github.com/tendermint/tendermint/events"
"github.com/tendermint/tendermint/types"
"github.com/tendermint/tendermint/vm"
)

// NOTE: If an error occurs during block execution, state will be left
// at an invalid state. Copy the state before calling ExecBlock!
func ExecBlock(s *State, block *types.Block, blockPartsHeader types.PartSetHeader) error {
err := execBlock(s, block, blockPartsHeader, true)
err := execBlock(s, block, blockPartsHeader)
if err != nil {
return err
}
Expand All @@ -29,7 +30,7 @@ func ExecBlock(s *State, block *types.Block, blockPartsHeader types.PartSetHeade
// executes transactions of a block, does not check block.StateHash
// NOTE: If an error occurs during block execution, state will be left
// at an invalid state. Copy the state before calling execBlock!
func execBlock(s *State, block *types.Block, blockPartsHeader types.PartSetHeader, fireEvents bool) error {
func execBlock(s *State, block *types.Block, blockPartsHeader types.PartSetHeader) error {
// Basic block validation.
err := block.ValidateBasic(s.LastBlockHeight, s.LastBlockHash, s.LastBlockParts, s.LastBlockTime)
if err != nil {
Expand Down Expand Up @@ -111,7 +112,7 @@ func execBlock(s *State, block *types.Block, blockPartsHeader types.PartSetHeade

// Commit each tx
for _, tx := range block.Data.Txs {
err := ExecTx(blockCache, tx, true, fireEvents)
err := ExecTx(blockCache, tx, true, s.evc)
if err != nil {
return InvalidTxError{tx, err}
}
Expand Down Expand Up @@ -291,13 +292,11 @@ func adjustByOutputs(accounts map[string]*account.Account, outs []*types.TxOutpu

// If the tx is invalid, an error will be returned.
// Unlike ExecBlock(), state will not be altered.
func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall, fireEvents bool) error {
func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall bool, evc events.Fireable) error {

// TODO: do something with fees
fees := uint64(0)
_s := blockCache.State() // hack to access validators and event switch.
nilSwitch := _s.evsw == nil
fireEvents = fireEvents && !nilSwitch

// Exec tx
switch tx := tx_.(type) {
Expand Down Expand Up @@ -328,16 +327,14 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall, fireEvents bool) erro
blockCache.UpdateAccount(acc)
}

// If we're in a block (not mempool),
// fire event on all inputs and outputs
// see types/events.go for spec
if fireEvents {
// if the evc is nil, nothing will happen
if evc != nil {
for _, i := range tx.Inputs {
_s.evsw.FireEvent(types.EventStringAccInput(i.Address), tx)
evc.FireEvent(types.EventStringAccInput(i.Address), tx)
}

for _, o := range tx.Outputs {
_s.evsw.FireEvent(types.EventStringAccOutput(o.Address), tx)
evc.FireEvent(types.EventStringAccOutput(o.Address), tx)
}
}
return nil
Expand Down Expand Up @@ -427,7 +424,7 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall, fireEvents bool) erro
txCache.UpdateAccount(caller) // because we adjusted by input above, and bumped nonce maybe.
txCache.UpdateAccount(callee) // because we adjusted by input above.
vmach := vm.NewVM(txCache, params, caller.Address, account.HashSignBytes(tx))
vmach.SetEventSwitch(_s.evsw)
vmach.SetFireable(_s.evc)
// NOTE: Call() transfers the value from caller to callee iff call succeeds.

ret, err := vmach.Call(caller, callee, code, tx.Data, value, &gas)
Expand All @@ -451,12 +448,11 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall, fireEvents bool) erro
// Create a receipt from the ret and whether errored.
log.Info("VM call complete", "caller", caller, "callee", callee, "return", ret, "err", err)

if fireEvents {
// Fire Events for sender and receiver
// a separate event will be fired from vm for each
_s.evsw.FireEvent(types.EventStringAccInput(tx.Input.Address), types.EventMsgCallTx{tx, ret, exception})

_s.evsw.FireEvent(types.EventStringAccReceive(tx.Address), types.EventMsgCallTx{tx, ret, exception})
// Fire Events for sender and receiver
// a separate event will be fired from vm for each additional call
if evc != nil {
evc.FireEvent(types.EventStringAccInput(tx.Input.Address), types.EventMsgCallTx{tx, ret, exception})
evc.FireEvent(types.EventStringAccReceive(tx.Address), types.EventMsgCallTx{tx, ret, exception})
}
} else {
// The mempool does not call txs until
Expand Down Expand Up @@ -525,8 +521,8 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall, fireEvents bool) erro
if !added {
panic("Failed to add validator")
}
if fireEvents {
_s.evsw.FireEvent(types.EventStringBond(), tx)
if evc != nil {
evc.FireEvent(types.EventStringBond(), tx)
}
return nil

Expand All @@ -550,8 +546,8 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall, fireEvents bool) erro

// Good!
_s.unbondValidator(val)
if fireEvents {
_s.evsw.FireEvent(types.EventStringUnbond(), tx)
if evc != nil {
evc.FireEvent(types.EventStringUnbond(), tx)
}
return nil

Expand All @@ -575,8 +571,8 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall, fireEvents bool) erro

// Good!
_s.rebondValidator(val)
if fireEvents {
_s.evsw.FireEvent(types.EventStringRebond(), tx)
if evc != nil {
evc.FireEvent(types.EventStringRebond(), tx)
}
return nil

Expand Down Expand Up @@ -621,8 +617,8 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall, fireEvents bool) erro

// Good! (Bad validator!)
_s.destroyValidator(accused)
if fireEvents {
_s.evsw.FireEvent(types.EventStringDupeout(), tx)
if evc != nil {
evc.FireEvent(types.EventStringDupeout(), tx)
}
return nil

Expand Down
12 changes: 6 additions & 6 deletions state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type State struct {
accounts merkle.Tree // Shouldn't be accessed directly.
validatorInfos merkle.Tree // Shouldn't be accessed directly.

evsw *events.EventSwitch
evc events.Fireable // typically an events.EventCache
}

func LoadState(db dbm.DB) *State {
Expand Down Expand Up @@ -101,7 +101,6 @@ func (s *State) Copy() *State {
UnbondingValidators: s.UnbondingValidators.Copy(), // copy the valSet lazily.
accounts: s.accounts.Copy(),
validatorInfos: s.validatorInfos.Copy(),
evsw: s.evsw,
}
}

Expand All @@ -119,7 +118,8 @@ func (s *State) Hash() []byte {
// Mutates the block in place and updates it with new state hash.
func (s *State) SetBlockStateHash(block *types.Block) error {
sCopy := s.Copy()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

be nice to have evc: nil on state.Copy()

err := execBlock(sCopy, block, types.PartSetHeader{}, false) // don't fire events
// sCopy has no event cache in it, so this won't fire events
err := execBlock(sCopy, block, types.PartSetHeader{})
if err != nil {
return err
}
Expand Down Expand Up @@ -268,9 +268,9 @@ func (s *State) LoadStorage(hash []byte) (storage merkle.Tree) {
// State.storage
//-------------------------------------

// implements events.Eventable
func (s *State) SetEventSwitch(evsw *events.EventSwitch) {
s.evsw = evsw
// Implements events.Eventable. Typically uses events.EventCache
func (s *State) SetFireable(evc events.Fireable) {
s.evc = evc
}

//-----------------------------------------------------------------------------
Expand Down
Loading