Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Websockets #52

Merged
merged 12 commits into from
Apr 17, 2015
Prev Previous commit
more fixes from review
  • Loading branch information
ebuchman committed Apr 17, 2015
commit 65f669160fff441a807929ff443f19e3ad454933
2 changes: 2 additions & 0 deletions events/event_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ func (evc *EventCache) FireEvent(event string, msg interface{}) {
}

// Fire events by running evsw.FireEvent on all cached events. Blocks.
// Clears cached events
func (evc *EventCache) Flush() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clear out the evc.events

for _, ei := range evc.events {
evc.evsw.FireEvent(ei.event, ei.msg)
}
evc.events = make([]eventInfo, eventsBufferSize)
}
4 changes: 4 additions & 0 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,10 @@ func (n *Node) MempoolReactor() *mempl.MempoolReactor {
return n.mempoolReactor
}

func (n *Node) EventSwitch() *events.EventSwitch {
return n.evsw
}

//------------------------------------------------------------------------------

func RunNode() {
Expand Down
48 changes: 27 additions & 21 deletions rpc/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,9 @@ type WSResponse struct {
}

// a single websocket connection
// contains the listeners id
type Connection struct {
// contains listener id, underlying ws connection,
// and the event switch for subscribing to events
type WSConnection struct {
id string
wsConn *websocket.Conn
writeChan chan WSResponse
Expand All @@ -225,16 +226,16 @@ type Connection struct {
}

// new websocket connection wrapper
func NewConnection(wsConn *websocket.Conn) *Connection {
return &Connection{
func NewWSConnection(wsConn *websocket.Conn) *WSConnection {
return &WSConnection{
id: wsConn.RemoteAddr().String(),
wsConn: wsConn,
writeChan: make(chan WSResponse, WriteChanBufferSize), // buffered. we keep track when its full
}
}

// start the connection and hand her the event switch
func (con *Connection) Start(evsw *events.EventSwitch) {
func (con *WSConnection) Start(evsw *events.EventSwitch) {
if atomic.CompareAndSwapUint32(&con.started, 0, 1) {
con.evsw = evsw

Expand All @@ -246,15 +247,29 @@ func (con *Connection) Start(evsw *events.EventSwitch) {
}

// close the connection
func (con *Connection) Stop() {
func (con *WSConnection) Stop() {
if atomic.CompareAndSwapUint32(&con.stopped, 0, 1) {
con.wsConn.Close()
close(con.writeChan)
}
}

// attempt to write response to writeChan and record failures
func (con *WSConnection) safeWrite(resp WSResponse) {
select {
case con.writeChan <- resp:
// yay
con.failedSends = 0
default:
// channel is full
// if this happens too many times in a row,
// close connection
con.failedSends += 1
}
}

// read from the socket and subscribe to or unsubscribe from events
func (con *Connection) read() {
func (con *WSConnection) read() {
reaper := time.Tick(time.Second * WSConnectionReaperSeconds)
for {
select {
Expand All @@ -278,7 +293,7 @@ func (con *Connection) read() {
err = json.Unmarshal(in, &req)
if err != nil {
errStr := fmt.Sprintf("Error unmarshaling data: %s", err.Error())
con.writeChan <- WSResponse{Error: errStr}
con.safeWrite(WSResponse{Error: errStr})
continue
}
switch req.Type {
Expand All @@ -289,16 +304,7 @@ func (con *Connection) read() {
Event: req.Event,
Data: msg,
}
select {
case con.writeChan <- resp:
// yay
con.failedSends = 0
default:
// channel is full
// if this happens too many times,
// close connection
con.failedSends += 1
}
con.safeWrite(resp)
})
case "unsubscribe":
if req.Event != "" {
Expand All @@ -307,15 +313,15 @@ func (con *Connection) read() {
con.evsw.RemoveListener(con.id)
}
default:
con.writeChan <- WSResponse{Error: "Unknown request type: " + req.Type}
con.safeWrite(WSResponse{Error: "Unknown request type: " + req.Type})
}

}
}
}

// receives on a write channel and writes out on the socket
func (con *Connection) write() {
func (con *WSConnection) write() {
n, err := new(int64), new(error)
for {
msg, more := <-con.writeChan
Expand Down Expand Up @@ -369,7 +375,7 @@ func (wm *WebsocketManager) websocketHandler(w http.ResponseWriter, r *http.Requ
}

// register connection
con := NewConnection(wsConn)
con := NewWSConnection(wsConn)
log.Info("New websocket connection", "origin", con.id)
con.Start(wm.evsw)
}
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
1 change: 1 addition & 0 deletions state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func (s *State) Copy() *State {
UnbondingValidators: s.UnbondingValidators.Copy(), // copy the valSet lazily.
accounts: s.accounts.Copy(),
validatorInfos: s.validatorInfos.Copy(),
evc: nil,
}
}

Expand Down
2 changes: 1 addition & 1 deletion state/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

func execTxWithState(state *State, tx types.Tx, runCall bool) error {
cache := NewBlockCache(state)
err := ExecTx(cache, tx, runCall, false)
err := ExecTx(cache, tx, runCall, nil)
if err != nil {
return err
} else {
Expand Down
7 changes: 4 additions & 3 deletions vm/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,6 @@ func (vm *VM) call(caller, callee *Account, code, input []byte, value uint64, ga
x, y := stack.Pop64(), stack.Pop64()
stack.Push64(x & y)
dbg.Printf(" %v & %v = %v\n", x, y, x&y)

case OR: // 0x17
x, y := stack.Pop64(), stack.Pop64()
stack.Push64(x | y)
Expand Down Expand Up @@ -381,7 +380,7 @@ func (vm *VM) call(caller, callee *Account, code, input []byte, value uint64, ga
return nil, firstErr(err, ErrInputOutOfBounds)
}
stack.Push(RightPadWord256(data))
dbg.Printf(" => 0x%X\n", data)
dbg.Printf(" => 0x%X\n", RightPadWord256(data))

case CALLDATASIZE: // 0x36
stack.Push64(uint64(len(input)))
Expand Down Expand Up @@ -721,10 +720,12 @@ func subslice(data []byte, offset, length uint64, flip_ bool) (ret []byte, ok bo
if size < offset {
return nil, false
} else if size < offset+length {
ret, ok = data[offset:], false
ret, ok = data[offset:], true
ret = RightPadBytes(ret, 32)
} else {
ret, ok = data[offset:offset+length], true
}

if flip_ {
ret = flip(ret)
}
Expand Down