-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Websockets #52
Merged
Merged
Websockets #52
Changes from 1 commit
Commits
Show all changes
12 commits
Select commit
Hold shift + click to select a range
12814db
rpc: use gorilla websockets
ebuchman 8e24b12
rpc: first successful websocket event subscription
ebuchman 85661de
fire events, event urls
ebuchman 2e918e8
rpc: websocket events testing
ebuchman 860d547
rpc: use NewBlock event in rpc tests
ebuchman b4388ae
vm: eventable and flip fix on CALL address
ebuchman 34098ed
state: fireEvents flag on ExecTx and fixes for GetAccount
ebuchman 9b76cfe
rpc: cleanup tests and test contract calls
ebuchman a1c5e32
fixes from review with jae
ebuchman d27e0bb
event cache and fireable interace
ebuchman 75049ec
major flippage for vm addrs. now left padded words for tx_cache
ebuchman 65f6691
more fixes from review
ebuchman File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
rpc: first successful websocket event subscription
- Loading branch information
commit 8e24b128884797a289b1ebb36f973b1eff9de8f0
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,7 +26,7 @@ func RegisterRPCFuncs(mux *http.ServeMux, funcMap map[string]*RPCFunc) { | |
func RegisterEventsHandler(mux *http.ServeMux, evsw *events.EventSwitch) { | ||
// websocket endpoint | ||
w := NewWebsocketManager(evsw) | ||
http.HandleFunc("/events", w.websocketHandler) // websocket.Handler(w.eventsHandler)) | ||
mux.HandleFunc("/events", w.websocketHandler) // websocket.Handler(w.eventsHandler)) | ||
} | ||
|
||
//------------------------------------- | ||
|
@@ -191,14 +191,20 @@ func _jsonStringToArg(ty reflect.Type, arg string) (reflect.Value, error) { | |
//----------------------------------------------------------------------------- | ||
// rpc.websocket | ||
|
||
const ( | ||
WSConnectionReaperSeconds = 5 | ||
MaxFailedSendsSeconds = 10 | ||
WriteChanBufferSize = 10 | ||
) | ||
|
||
// for requests coming in | ||
type WsRequest struct { | ||
type WSRequest struct { | ||
Type string // subscribe or unsubscribe | ||
Event string | ||
} | ||
|
||
// for responses going out | ||
type WsResponse struct { | ||
type WSResponse struct { | ||
Event string | ||
Data interface{} | ||
Error string | ||
|
@@ -209,7 +215,7 @@ type WsResponse struct { | |
type Connection struct { | ||
id string | ||
wsCon *websocket.Conn | ||
writeChan chan WsResponse | ||
writeChan chan WSResponse | ||
quitChan chan struct{} | ||
failedSends uint | ||
} | ||
|
@@ -219,7 +225,7 @@ func NewConnection(con *websocket.Conn) *Connection { | |
return &Connection{ | ||
id: con.RemoteAddr().String(), | ||
wsCon: con, | ||
writeChan: make(chan WsResponse, WriteChanBuffer), // buffered. we keep track when its full | ||
writeChan: make(chan WSResponse, WriteChanBufferSize), // buffered. we keep track when its full | ||
} | ||
} | ||
|
||
|
@@ -276,15 +282,9 @@ func (w *WebsocketManager) handleWebsocket(con *websocket.Conn) { | |
w.write(c) | ||
} | ||
|
||
const ( | ||
WsConnectionReaperSeconds = 5 | ||
MaxFailedSendsSeconds = 10 | ||
WriteChanBuffer = 10 | ||
) | ||
|
||
// read from the socket and subscribe to or unsubscribe from events | ||
func (w *WebsocketManager) read(con *Connection) { | ||
reaper := time.Tick(time.Second * WsConnectionReaperSeconds) | ||
reaper := time.Tick(time.Second * WSConnectionReaperSeconds) | ||
for { | ||
select { | ||
case <-reaper: | ||
|
@@ -302,17 +302,17 @@ func (w *WebsocketManager) read(con *Connection) { | |
// so kill the connection | ||
con.quitChan <- struct{}{} | ||
} | ||
var req WsRequest | ||
var req WSRequest | ||
err = json.Unmarshal(in, &req) | ||
if err != nil { | ||
errStr := fmt.Sprintf("Error unmarshaling data: %s", err.Error()) | ||
con.writeChan <- WsResponse{Error: errStr} | ||
con.writeChan <- WSResponse{Error: errStr} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. need select{default:} here because writeChan couldhave been closed after wsConn is closed. |
||
} | ||
switch req.Type { | ||
case "subscribe": | ||
log.Info("New event subscription", "con id", con.id, "event", req.Event) | ||
w.ew.AddListenerForEvent(con.id, req.Event, func(msg interface{}) { | ||
resp := WsResponse{ | ||
resp := WSResponse{ | ||
Event: req.Event, | ||
Data: msg, | ||
} | ||
|
@@ -334,7 +334,7 @@ func (w *WebsocketManager) read(con *Connection) { | |
w.ew.RemoveListener(con.id) | ||
} | ||
default: | ||
con.writeChan <- WsResponse{Error: "Unknown request type: " + req.Type} | ||
con.writeChan <- WSResponse{Error: "Unknown request type: " + req.Type} | ||
} | ||
|
||
} | ||
|
@@ -350,7 +350,7 @@ func (w *WebsocketManager) write(con *Connection) { | |
buf := new(bytes.Buffer) | ||
binary.WriteJSON(msg, buf, n, err) | ||
if *err != nil { | ||
log.Error("Failed to write JSON WsResponse", "error", err) | ||
log.Error("Failed to write JSON WSResponse", "error", err) | ||
} else { | ||
//websocket.Message.Send(con.wsCon, buf.Bytes()) | ||
if err := con.wsCon.WriteMessage(websocket.TextMessage, buf.Bytes()); err != nil { | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a second?