Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Websockets #52

Merged
merged 12 commits into from
Apr 17, 2015
Prev Previous commit
Next Next commit
rpc: first successful websocket event subscription
  • Loading branch information
ebuchman committed Apr 9, 2015
commit 8e24b128884797a289b1ebb36f973b1eff9de8f0
1 change: 1 addition & 0 deletions consensus/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ func (conR *ConsensusReactor) ResetToState(state *sm.State) {
// implements events.Eventable
func (conR *ConsensusReactor) SetEventSwitch(evsw *events.EventSwitch) {
conR.evsw = evsw
conR.conS.SetEventSwitch(evsw)
}

//--------------------------------------
Expand Down
10 changes: 10 additions & 0 deletions consensus/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ import (
. "github.com/tendermint/tendermint/common"
"github.com/tendermint/tendermint/config"
. "github.com/tendermint/tendermint/consensus/types"
"github.com/tendermint/tendermint/events"
mempl "github.com/tendermint/tendermint/mempool"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types"
Expand Down Expand Up @@ -246,6 +247,8 @@ type ConsensusState struct {
stagedBlock *types.Block // Cache last staged block.
stagedState *sm.State // Cache result of staged block.
lastCommitVoteHeight uint // Last called commitVoteBlock() or saveCommitVoteBlock() on.

evsw *events.EventSwitch
}

func NewConsensusState(state *sm.State, blockStore *bc.BlockStore, mempoolReactor *mempl.MempoolReactor) *ConsensusState {
Expand Down Expand Up @@ -437,6 +440,8 @@ ACTION_LOOP:
if cs.TryFinalizeCommit(rs.Height) {
// Now at new height
// cs.Step is at RoundStepNewHeight or RoundStepNewRound.
// newblock event!
cs.evsw.FireEvent("newblock", cs.state.LastBlockHash)
scheduleNextAction()
continue ACTION_LOOP
} else {
Expand Down Expand Up @@ -1107,6 +1112,11 @@ func (cs *ConsensusState) saveCommitVoteBlock(block *types.Block, blockParts *ty
}
}

// implements events.Eventable
func (cs *ConsensusState) SetEventSwitch(evsw *events.EventSwitch) {
cs.evsw = evsw
}

//-----------------------------------------------------------------------------

// total duration of given round
Expand Down
4 changes: 0 additions & 4 deletions rpc/core/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,6 @@ import (
"github.com/tendermint/tendermint/rpc"
)

/*
TODO: support Call && GetStorage.
*/

var Routes = map[string]*rpc.RPCFunc{
"status": rpc.NewRPCFunc(Status, []string{}),
"net_info": rpc.NewRPCFunc(NetInfo, []string{}),
Expand Down
34 changes: 17 additions & 17 deletions rpc/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func RegisterRPCFuncs(mux *http.ServeMux, funcMap map[string]*RPCFunc) {
func RegisterEventsHandler(mux *http.ServeMux, evsw *events.EventSwitch) {
// websocket endpoint
w := NewWebsocketManager(evsw)
http.HandleFunc("/events", w.websocketHandler) // websocket.Handler(w.eventsHandler))
mux.HandleFunc("/events", w.websocketHandler) // websocket.Handler(w.eventsHandler))
}

//-------------------------------------
Expand Down Expand Up @@ -191,14 +191,20 @@ func _jsonStringToArg(ty reflect.Type, arg string) (reflect.Value, error) {
//-----------------------------------------------------------------------------
// rpc.websocket

const (
WSConnectionReaperSeconds = 5
MaxFailedSendsSeconds = 10
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a second?

WriteChanBufferSize = 10
)

// for requests coming in
type WsRequest struct {
type WSRequest struct {
Type string // subscribe or unsubscribe
Event string
}

// for responses going out
type WsResponse struct {
type WSResponse struct {
Event string
Data interface{}
Error string
Expand All @@ -209,7 +215,7 @@ type WsResponse struct {
type Connection struct {
id string
wsCon *websocket.Conn
writeChan chan WsResponse
writeChan chan WSResponse
quitChan chan struct{}
failedSends uint
}
Expand All @@ -219,7 +225,7 @@ func NewConnection(con *websocket.Conn) *Connection {
return &Connection{
id: con.RemoteAddr().String(),
wsCon: con,
writeChan: make(chan WsResponse, WriteChanBuffer), // buffered. we keep track when its full
writeChan: make(chan WSResponse, WriteChanBufferSize), // buffered. we keep track when its full
}
}

Expand Down Expand Up @@ -276,15 +282,9 @@ func (w *WebsocketManager) handleWebsocket(con *websocket.Conn) {
w.write(c)
}

const (
WsConnectionReaperSeconds = 5
MaxFailedSendsSeconds = 10
WriteChanBuffer = 10
)

// read from the socket and subscribe to or unsubscribe from events
func (w *WebsocketManager) read(con *Connection) {
reaper := time.Tick(time.Second * WsConnectionReaperSeconds)
reaper := time.Tick(time.Second * WSConnectionReaperSeconds)
for {
select {
case <-reaper:
Expand All @@ -302,17 +302,17 @@ func (w *WebsocketManager) read(con *Connection) {
// so kill the connection
con.quitChan <- struct{}{}
}
var req WsRequest
var req WSRequest
err = json.Unmarshal(in, &req)
if err != nil {
errStr := fmt.Sprintf("Error unmarshaling data: %s", err.Error())
con.writeChan <- WsResponse{Error: errStr}
con.writeChan <- WSResponse{Error: errStr}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need select{default:} here because writeChan couldhave been closed after wsConn is closed.
and more places prob need it too

}
switch req.Type {
case "subscribe":
log.Info("New event subscription", "con id", con.id, "event", req.Event)
w.ew.AddListenerForEvent(con.id, req.Event, func(msg interface{}) {
resp := WsResponse{
resp := WSResponse{
Event: req.Event,
Data: msg,
}
Expand All @@ -334,7 +334,7 @@ func (w *WebsocketManager) read(con *Connection) {
w.ew.RemoveListener(con.id)
}
default:
con.writeChan <- WsResponse{Error: "Unknown request type: " + req.Type}
con.writeChan <- WSResponse{Error: "Unknown request type: " + req.Type}
}

}
Expand All @@ -350,7 +350,7 @@ func (w *WebsocketManager) write(con *Connection) {
buf := new(bytes.Buffer)
binary.WriteJSON(msg, buf, n, err)
if *err != nil {
log.Error("Failed to write JSON WsResponse", "error", err)
log.Error("Failed to write JSON WSResponse", "error", err)
} else {
//websocket.Message.Send(con.wsCon, buf.Bytes())
if err := con.wsCon.WriteMessage(websocket.TextMessage, buf.Bytes()); err != nil {
Expand Down
1 change: 1 addition & 0 deletions rpc/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/tendermint/tendermint/binary"
. "github.com/tendermint/tendermint/common"
"github.com/tendermint/tendermint/events"
"github.com/tendermint/tendermint/alert"
)

func StartHTTPServer(listenAddr string, funcMap map[string]*RPCFunc, evsw *events.EventSwitch) {
Expand Down
12 changes: 4 additions & 8 deletions rpc/test/client_rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,10 @@ func TestWSConnect(t *testing.T) {
dialer := websocket.DefaultDialer
rHeader := http.Header{}
_, r, err := dialer.Dial(websocketAddr, rHeader)
fmt.Println("respoinse:", r)
if err != nil {
t.Fatal(err)
}
fmt.Println("respoinse:", r)

}

func TestWSSubscribe(t *testing.T) {
Expand All @@ -99,16 +98,13 @@ func TestWSSubscribe(t *testing.T) {
if err != nil {
t.Fatal(err)
}
err = con.WriteJSON(rpc.WsRequest{
err = con.WriteJSON(rpc.WSRequest{
Type: "subscribe",
Event: "newblock",
})
if err != nil {
t.Fatal(err)
}
/*
typ, p, err := con.ReadMessage()
fmt.Println("RESPONSE:", typ, string(p), err)
*/

typ, p, err := con.ReadMessage()
fmt.Println("RESPONSE:", typ, string(p), err)
}