Skip to content

Commit

Permalink
fixes from review with jae
Browse files Browse the repository at this point in the history
  • Loading branch information
ebuchman committed Apr 16, 2015
1 parent 9b76cfe commit c2f8463
Show file tree
Hide file tree
Showing 11 changed files with 127 additions and 127 deletions.
6 changes: 6 additions & 0 deletions account/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"

"github.com/tendermint/tendermint/binary"
"github.com/tendermint/tendermint/merkle"
)

// Signable is an interface for all signable things.
Expand All @@ -24,6 +25,11 @@ func SignBytes(o Signable) []byte {
return buf.Bytes()
}

// HashSignBytes is a convenience method for getting the hash of the bytes of a signable
func HashSignBytes(o Signable) []byte {
return merkle.HashFromBinary(SignBytes(o))
}

//-----------------------------------------------------------------------------

// Account resides in the application state, and is mutated by transactions
Expand Down
1 change: 1 addition & 0 deletions consensus/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,7 @@ ACTION_LOOP:
// cs.Step is at RoundStepNewHeight or RoundStepNewRound.
newBlock := cs.blockStore.LoadBlock(cs.state.LastBlockHeight)
cs.evsw.FireEvent(types.EventStringNewBlock(), newBlock)
// TODO: go fire events from event cache
scheduleNextAction()
continue ACTION_LOOP
} else {
Expand Down
144 changes: 74 additions & 70 deletions rpc/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"io/ioutil"
"net/http"
"reflect"
"sync/atomic"
"time"
)

Expand All @@ -25,8 +26,8 @@ func RegisterRPCFuncs(mux *http.ServeMux, funcMap map[string]*RPCFunc) {

func RegisterEventsHandler(mux *http.ServeMux, evsw *events.EventSwitch) {
// websocket endpoint
w := NewWebsocketManager(evsw)
mux.HandleFunc("/events", w.websocketHandler) // websocket.Handler(w.eventsHandler))
wm := NewWebsocketManager(evsw)
mux.HandleFunc("/events", wm.websocketHandler) // websocket.Handler(w.eventsHandler))
}

//-------------------------------------
Expand Down Expand Up @@ -193,7 +194,7 @@ func _jsonStringToArg(ty reflect.Type, arg string) (reflect.Value, error) {

const (
WSConnectionReaperSeconds = 5
MaxFailedSendsSeconds = 10
MaxFailedSends = 10
WriteChanBufferSize = 10
)

Expand All @@ -214,103 +215,79 @@ type WSResponse struct {
// contains the listeners id
type Connection struct {
id string
wsCon *websocket.Conn
wsConn *websocket.Conn
writeChan chan WSResponse
quitChan chan struct{}
failedSends uint
started uint32
stopped uint32

evsw *events.EventSwitch
}

// new websocket connection wrapper
func NewConnection(con *websocket.Conn) *Connection {
func NewConnection(wsConn *websocket.Conn) *Connection {
return &Connection{
id: con.RemoteAddr().String(),
wsCon: con,
id: wsConn.RemoteAddr().String(),
wsConn: wsConn,
writeChan: make(chan WSResponse, WriteChanBufferSize), // buffered. we keep track when its full
quitChan: make(chan struct{}),
}
}

// close the connection
func (c *Connection) Close() {
c.wsCon.Close()
close(c.writeChan)
close(c.quitChan)
}
// start the connection and hand her the event switch
func (con *Connection) Start(evsw *events.EventSwitch) {
if atomic.CompareAndSwapUint32(&con.started, 0, 1) {
con.evsw = evsw

// main manager for all websocket connections
// holds the event switch
type WebsocketManager struct {
websocket.Upgrader
ew *events.EventSwitch
cons map[string]*Connection
}

func NewWebsocketManager(ew *events.EventSwitch) *WebsocketManager {
return &WebsocketManager{
ew: ew,
cons: make(map[string]*Connection),
Upgrader: websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
// TODO
return true
},
},
// read subscriptions/unsubscriptions to events
go con.read()
// write responses
con.write()
}
}

func (wm *WebsocketManager) websocketHandler(w http.ResponseWriter, r *http.Request) {
conn, err := wm.Upgrade(w, r, nil)
if err != nil {
// TODO
log.Error("Failed to upgrade to websocket connection", "error", err)
return
// close the connection
func (con *Connection) Stop() {
if atomic.CompareAndSwapUint32(&con.stopped, 0, 1) {
con.wsConn.Close()
close(con.writeChan)
close(con.quitChan)
}
wm.handleWebsocket(conn)

}

func (w *WebsocketManager) handleWebsocket(con *websocket.Conn) {
// register connection
c := NewConnection(con)
w.cons[c.id] = c
log.Info("New websocket connection", "origin", c.id)

// read subscriptions/unsubscriptions to events
go w.read(c)
// write responses
w.write(c)
}

// read from the socket and subscribe to or unsubscribe from events
func (w *WebsocketManager) read(con *Connection) {
func (con *Connection) read() {
reaper := time.Tick(time.Second * WSConnectionReaperSeconds)
for {
select {
case <-reaper:
if con.failedSends > MaxFailedSendsSeconds {
if con.failedSends > MaxFailedSends {
// sending has failed too many times.
// kill the connection
con.quitChan <- struct{}{}
con.Stop()
break
}
default:
var in []byte
_, in, err := con.wsCon.ReadMessage()
_, in, err := con.wsConn.ReadMessage()
if err != nil {
// an error reading the connection,
// so kill the connection
con.quitChan <- struct{}{}
con.Stop()
break
}
var req WSRequest
err = json.Unmarshal(in, &req)
if err != nil {
errStr := fmt.Sprintf("Error unmarshaling data: %s", err.Error())
con.writeChan <- WSResponse{Error: errStr}
continue
}
switch req.Type {
case "subscribe":
log.Info("New event subscription", "con id", con.id, "event", req.Event)
w.ew.AddListenerForEvent(con.id, req.Event, func(msg interface{}) {
con.evsw.AddListenerForEvent(con.id, req.Event, func(msg interface{}) {
resp := WSResponse{
Event: req.Event,
Data: msg,
Expand All @@ -328,9 +305,9 @@ func (w *WebsocketManager) read(con *Connection) {
})
case "unsubscribe":
if req.Event != "" {
w.ew.RemoveListenerForEvent(req.Event, con.id)
con.evsw.RemoveListenerForEvent(req.Event, con.id)
} else {
w.ew.RemoveListener(con.id)
con.evsw.RemoveListener(con.id)
}
default:
con.writeChan <- WSResponse{Error: "Unknown request type: " + req.Type}
Expand All @@ -340,8 +317,8 @@ func (w *WebsocketManager) read(con *Connection) {
}
}

// receives on a write channel and writes out to the socket
func (w *WebsocketManager) write(con *Connection) {
// receives on a write channel and writes out on the socket
func (con *Connection) write() {
n, err := new(int64), new(error)
for {
select {
Expand All @@ -351,22 +328,49 @@ func (w *WebsocketManager) write(con *Connection) {
if *err != nil {
log.Error("Failed to write JSON WSResponse", "error", err)
} else {
//websocket.Message.Send(con.wsCon, buf.Bytes())
if err := con.wsCon.WriteMessage(websocket.TextMessage, buf.Bytes()); err != nil {
if err := con.wsConn.WriteMessage(websocket.TextMessage, buf.Bytes()); err != nil {
log.Error("Failed to write response on websocket", "error", err)
}
}
case <-con.quitChan:
w.closeConn(con)
return
}
}
}

// close a connection and delete from manager
func (w *WebsocketManager) closeConn(con *Connection) {
con.Close()
delete(w.cons, con.id)
// main manager for all websocket connections
// holds the event switch
type WebsocketManager struct {
websocket.Upgrader
evsw *events.EventSwitch
}

func NewWebsocketManager(evsw *events.EventSwitch) *WebsocketManager {
return &WebsocketManager{
evsw: evsw,
Upgrader: websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
// TODO
return true
},
},
}
}

func (wm *WebsocketManager) websocketHandler(w http.ResponseWriter, r *http.Request) {
wsConn, err := wm.Upgrade(w, r, nil)
if err != nil {
// TODO - return http error
log.Error("Failed to upgrade to websocket connection", "error", err)
return
}

// register connection
con := NewConnection(wsConn)
log.Info("New websocket connection", "origin", con.id)
con.Start(wm.evsw)
}

// rpc.websocket
Expand Down
5 changes: 2 additions & 3 deletions rpc/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func WriteRPCResponse(w http.ResponseWriter, res RPCResponse) {
func RecoverAndLogHandler(handler http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Wrap the ResponseWriter to remember the status
rww := &ResponseWriterWrapper{-1, w, w.(http.Hijacker)}
rww := &ResponseWriterWrapper{-1, w}
begin := time.Now()

// Common headers
Expand Down Expand Up @@ -100,7 +100,6 @@ func RecoverAndLogHandler(handler http.Handler) http.Handler {
type ResponseWriterWrapper struct {
Status int
http.ResponseWriter
hj http.Hijacker // necessary for websocket upgrades
}

func (w *ResponseWriterWrapper) WriteHeader(status int) {
Expand All @@ -110,7 +109,7 @@ func (w *ResponseWriterWrapper) WriteHeader(status int) {

// implements http.Hijacker
func (w *ResponseWriterWrapper) Hijack() (net.Conn, *bufio.ReadWriter, error) {
return w.hj.Hijack()
return w.ResponseWriter.(http.Hijacker).Hijack()
}

// Stick it as a deferred statement in gouroutines to prevent the program from crashing.
Expand Down
Loading

0 comments on commit c2f8463

Please sign in to comment.