Skip to content

Commit

Permalink
websocket: extract WebSocket code into a separate package
Browse files Browse the repository at this point in the history
  • Loading branch information
lebauce committed Sep 10, 2018
1 parent a25d9cf commit 5325336
Show file tree
Hide file tree
Showing 35 changed files with 358 additions and 341 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ flow/flow.pb.go: flow/flow.proto
sed -e 's/type Flow struct {/type Flow struct { XXX_state flowState `json:"-"`/' -i $@
gofmt -s -w $@

.proto: govendor flow/flow.pb.go filters/filters.pb.go http/wsstructmessage.pb.go
.proto: govendor flow/flow.pb.go filters/filters.pb.go websocket/wsstructmessage.pb.go

.PHONY: .proto.clean
.proto.clean:
Expand Down Expand Up @@ -525,7 +525,7 @@ docker-cross-build: ebpf.build
SKYDIVE_PROTO_FILES:= \
flow/flow.proto \
filters/filters.proto \
http/wsstructmessage.proto
websocket/wsstructmessage.proto

SKYDIVE_TAR_INPUT:= \
vendor \
Expand Down
23 changes: 12 additions & 11 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,15 @@ import (
"github.com/skydive-project/skydive/topology"
"github.com/skydive-project/skydive/topology/graph"
"github.com/skydive-project/skydive/topology/graph/traversal"
ws "github.com/skydive-project/skydive/websocket"
)

// Agent object started on each hosts/namespaces
type Agent struct {
shttp.DefaultWSSpeakerEventHandler
ws.DefaultWSSpeakerEventHandler
graph *graph.Graph
wsServer *shttp.WSStructServer
analyzerClientPool *shttp.WSStructClientPool
wsServer *ws.WSStructServer
analyzerClientPool *ws.WSStructClientPool
topologyEndpoint *topology.TopologySubscriberEndpoint
rootNode *graph.Node
topologyProbeBundle *probe.ProbeBundle
Expand All @@ -66,8 +67,8 @@ type Agent struct {

// NewAnalyzerWSStructClientPool creates a new http WebSocket client Pool
// with authentification
func NewAnalyzerWSStructClientPool(authOptions *shttp.AuthenticationOpts) (*shttp.WSStructClientPool, error) {
pool := shttp.NewWSStructClientPool("AnalyzerClientPool")
func NewAnalyzerWSStructClientPool(authOptions *shttp.AuthenticationOpts) (*ws.WSStructClientPool, error) {
pool := ws.NewWSStructClientPool("AnalyzerClientPool")

addresses, err := config.GetAnalyzerServiceAddresses()
if err != nil {
Expand All @@ -80,7 +81,7 @@ func NewAnalyzerWSStructClientPool(authOptions *shttp.AuthenticationOpts) (*shtt
}

for _, sa := range addresses {
c := shttp.NewWSClientFromConfig(common.AgentService, config.GetURL("ws", sa.Addr, sa.Port, "/ws/agent"), authOptions, nil)
c := ws.NewWSClientFromConfig(common.AgentService, config.GetURL("ws", sa.Addr, sa.Port, "/ws/agent"), authOptions, nil)
pool.AddClient(c)
}

Expand All @@ -89,13 +90,13 @@ func NewAnalyzerWSStructClientPool(authOptions *shttp.AuthenticationOpts) (*shtt

// AnalyzerConnStatus represents the status of a connection to an analyzer
type AnalyzerConnStatus struct {
shttp.WSConnStatus
ws.ConnStatus
IsMaster bool
}

// AgentStatus represents the status of an agent
type AgentStatus struct {
Clients map[string]shttp.WSConnStatus
Clients map[string]ws.ConnStatus
Analyzers map[string]AnalyzerConnStatus
TopologyProbes []string
FlowProbes []string
Expand All @@ -112,8 +113,8 @@ func (a *Agent) GetStatus() interface{} {
analyzers := make(map[string]AnalyzerConnStatus)
for id, status := range a.analyzerClientPool.GetStatus() {
analyzers[id] = AnalyzerConnStatus{
WSConnStatus: status,
IsMaster: status.Addr == masterAddr && status.Port == masterPort,
ConnStatus: status,
IsMaster: status.Addr == masterAddr && status.Port == masterPort,
}
}

Expand Down Expand Up @@ -192,7 +193,7 @@ func NewAgent() (*Agent, error) {
return nil, err
}

wsServer := shttp.NewWSStructServer(shttp.NewWSServer(hserver, "/ws/subscriber", apiAuthBackend))
wsServer := ws.NewWSStructServer(ws.NewWSServer(hserver, "/ws/subscriber", apiAuthBackend))

// declare all extension available throught API and filtering
tr := traversal.NewGremlinTraversalParser()
Expand Down
30 changes: 15 additions & 15 deletions agent/topology_forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,16 @@ package agent

import (
"github.com/skydive-project/skydive/config"
shttp "github.com/skydive-project/skydive/http"
"github.com/skydive-project/skydive/logging"
"github.com/skydive-project/skydive/topology/graph"
ws "github.com/skydive-project/skydive/websocket"
)

// TopologyForwarder forwards the topology to only one master server.
// When switching from one analyzer to another one the agent does a full
// re-sync since some messages could have been lost.
type TopologyForwarder struct {
masterElection *shttp.WSMasterElection
masterElection *ws.WSMasterElection
graph *graph.Graph
host string
}
Expand All @@ -45,15 +45,15 @@ func (t *TopologyForwarder) triggerResync() {
defer t.graph.RUnlock()

// request for deletion of everything belonging this host
t.masterElection.SendMessageToMaster(shttp.NewWSStructMessage(graph.Namespace, graph.HostGraphDeletedMsgType, t.host))
t.masterElection.SendMessageToMaster(ws.NewWSStructMessage(graph.Namespace, graph.HostGraphDeletedMsgType, t.host))

// re-add all the nodes and edges
t.masterElection.SendMessageToMaster(shttp.NewWSStructMessage(graph.Namespace, graph.SyncMsgType, t.graph))
t.masterElection.SendMessageToMaster(ws.NewWSStructMessage(graph.Namespace, graph.SyncMsgType, t.graph))
}

// OnNewMaster is called by the master election mechanism when a new master is elected. In
// such case a "Re-sync" is triggerd in order to be in sync with the new master.
func (t *TopologyForwarder) OnNewMaster(c shttp.WSSpeaker) {
func (t *TopologyForwarder) OnNewMaster(c ws.WSSpeaker) {
if c == nil {
logging.GetLogger().Warn("Lost connection to master")
} else {
Expand All @@ -65,43 +65,43 @@ func (t *TopologyForwarder) OnNewMaster(c shttp.WSSpeaker) {

// OnNodeUpdated graph node updated event. Implements the GraphEventListener interface.
func (t *TopologyForwarder) OnNodeUpdated(n *graph.Node) {
t.masterElection.SendMessageToMaster(shttp.NewWSStructMessage(graph.Namespace, graph.NodeUpdatedMsgType, n))
t.masterElection.SendMessageToMaster(ws.NewWSStructMessage(graph.Namespace, graph.NodeUpdatedMsgType, n))
}

// OnNodeAdded graph node added event. Implements the GraphEventListener interface.
func (t *TopologyForwarder) OnNodeAdded(n *graph.Node) {
t.masterElection.SendMessageToMaster(shttp.NewWSStructMessage(graph.Namespace, graph.NodeAddedMsgType, n))
t.masterElection.SendMessageToMaster(ws.NewWSStructMessage(graph.Namespace, graph.NodeAddedMsgType, n))
}

// OnNodeDeleted graph node deleted event. Implements the GraphEventListener interface.
func (t *TopologyForwarder) OnNodeDeleted(n *graph.Node) {
t.masterElection.SendMessageToMaster(shttp.NewWSStructMessage(graph.Namespace, graph.NodeDeletedMsgType, n))
t.masterElection.SendMessageToMaster(ws.NewWSStructMessage(graph.Namespace, graph.NodeDeletedMsgType, n))
}

// OnEdgeUpdated graph edge updated event. Implements the GraphEventListener interface.
func (t *TopologyForwarder) OnEdgeUpdated(e *graph.Edge) {
t.masterElection.SendMessageToMaster(shttp.NewWSStructMessage(graph.Namespace, graph.EdgeUpdatedMsgType, e))
t.masterElection.SendMessageToMaster(ws.NewWSStructMessage(graph.Namespace, graph.EdgeUpdatedMsgType, e))
}

// OnEdgeAdded graph edge added event. Implements the GraphEventListener interface.
func (t *TopologyForwarder) OnEdgeAdded(e *graph.Edge) {
t.masterElection.SendMessageToMaster(shttp.NewWSStructMessage(graph.Namespace, graph.EdgeAddedMsgType, e))
t.masterElection.SendMessageToMaster(ws.NewWSStructMessage(graph.Namespace, graph.EdgeAddedMsgType, e))
}

// OnEdgeDeleted graph edge deleted event. Implements the GraphEventListener interface.
func (t *TopologyForwarder) OnEdgeDeleted(e *graph.Edge) {
t.masterElection.SendMessageToMaster(shttp.NewWSStructMessage(graph.Namespace, graph.EdgeDeletedMsgType, e))
t.masterElection.SendMessageToMaster(ws.NewWSStructMessage(graph.Namespace, graph.EdgeDeletedMsgType, e))
}

// GetMaster returns the current analyzer the agent is sending its events to
func (t *TopologyForwarder) GetMaster() shttp.WSSpeaker {
func (t *TopologyForwarder) GetMaster() ws.WSSpeaker {
return t.masterElection.GetMaster()
}

// NewTopologyForwarder returns a new Graph forwarder which forwards event of the given graph
// to the given WebSocket JSON speakers.
func NewTopologyForwarder(host string, g *graph.Graph, pool shttp.WSStructSpeakerPool) *TopologyForwarder {
masterElection := shttp.NewWSMasterElection(pool)
func NewTopologyForwarder(host string, g *graph.Graph, pool ws.WSStructSpeakerPool) *TopologyForwarder {
masterElection := ws.NewWSMasterElection(pool)

t := &TopologyForwarder{
masterElection: masterElection,
Expand All @@ -116,7 +116,7 @@ func NewTopologyForwarder(host string, g *graph.Graph, pool shttp.WSStructSpeake
}

// NewTopologyForwarderFromConfig creates a TopologyForwarder from configuration
func NewTopologyForwarderFromConfig(g *graph.Graph, pool shttp.WSStructSpeakerPool) *TopologyForwarder {
func NewTopologyForwarderFromConfig(g *graph.Graph, pool ws.WSStructSpeakerPool) *TopologyForwarder {
host := config.GetString("host_id")
return NewTopologyForwarder(host, g, pool)
}
8 changes: 4 additions & 4 deletions alert/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ import (
"github.com/skydive-project/skydive/api/types"
"github.com/skydive-project/skydive/common"
"github.com/skydive-project/skydive/etcd"
shttp "github.com/skydive-project/skydive/http"
"github.com/skydive-project/skydive/js"
"github.com/skydive-project/skydive/logging"
"github.com/skydive-project/skydive/topology/graph"
"github.com/skydive-project/skydive/topology/graph/traversal"
ws "github.com/skydive-project/skydive/websocket"
)

const (
Expand Down Expand Up @@ -194,7 +194,7 @@ type Server struct {
common.RWMutex
*etcd.MasterElector
Graph *graph.Graph
Pool shttp.WSStructSpeakerPool
Pool ws.WSStructSpeakerPool
AlertHandler api.Handler
apiServer *api.Server
watcher api.StoppableWatcher
Expand Down Expand Up @@ -232,7 +232,7 @@ func (a *Server) triggerAlert(al *GremlinAlert, data interface{}) error {
}
}()

wsMsg := shttp.NewWSStructMessage(Namespace, "Alert", msg)
wsMsg := ws.NewWSStructMessage(Namespace, "Alert", msg)
a.Pool.BroadcastMessage(wsMsg)

logging.GetLogger().Debugf("Alert %s of type %s was triggerred", al.UUID, al.Action)
Expand Down Expand Up @@ -404,7 +404,7 @@ func (a *Server) Stop() {
}

// Returns a new alerting server
func NewServer(apiServer *api.Server, pool shttp.WSStructSpeakerPool, graph *graph.Graph, parser *traversal.GremlinTraversalParser, etcdClient *etcd.Client) (*Server, error) {
func NewServer(apiServer *api.Server, pool ws.WSStructSpeakerPool, graph *graph.Graph, parser *traversal.GremlinTraversalParser, etcdClient *etcd.Client) (*Server, error) {
elector := etcd.NewMasterElectorFromConfig(common.AnalyzerService, "alert-server", etcdClient)

jsre, err := js.NewJSRE()
Expand Down
16 changes: 8 additions & 8 deletions analyzer/flow_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ import (
"github.com/skydive-project/skydive/common"
"github.com/skydive-project/skydive/config"
"github.com/skydive-project/skydive/flow"
shttp "github.com/skydive-project/skydive/http"
"github.com/skydive-project/skydive/logging"
ws "github.com/skydive-project/skydive/websocket"
)

// FlowClientPool describes a flow client pool.
type FlowClientPool struct {
common.RWMutex
shttp.DefaultWSSpeakerEventHandler
ws.DefaultWSSpeakerEventHandler
flowClients []*FlowClient
}

Expand All @@ -66,9 +66,9 @@ type FlowClientUDPConn struct {

// FlowClientWebSocketConn describes WebSocket client connection
type FlowClientWebSocketConn struct {
shttp.DefaultWSSpeakerEventHandler
ws.DefaultWSSpeakerEventHandler
url *url.URL
wsClient *shttp.WSClient
wsClient *ws.WSClient
}

// Close the connection
Expand Down Expand Up @@ -108,7 +108,7 @@ func (c *FlowClientWebSocketConn) Close() error {
// Connect to the WebSocket flow server
func (c *FlowClientWebSocketConn) Connect() error {
authOptions := AnalyzerClusterAuthenticationOpts()
c.wsClient = shttp.NewWSClientFromConfig(common.AgentService, c.url, authOptions, nil)
c.wsClient = ws.NewWSClientFromConfig(common.AgentService, c.url, authOptions, nil)
c.wsClient.Connect()
c.wsClient.AddEventHandler(c)

Expand Down Expand Up @@ -195,7 +195,7 @@ func NewFlowClient(addr string, port int) (*FlowClient, error) {
}

// OnConnected websocket event handler
func (p *FlowClientPool) OnConnected(c shttp.WSSpeaker) {
func (p *FlowClientPool) OnConnected(c ws.WSSpeaker) {
p.Lock()
defer p.Unlock()

Expand All @@ -219,7 +219,7 @@ func (p *FlowClientPool) OnConnected(c shttp.WSSpeaker) {
}

// OnDisconnected websocket event handler
func (p *FlowClientPool) OnDisconnected(c shttp.WSSpeaker) {
func (p *FlowClientPool) OnDisconnected(c ws.WSSpeaker) {
p.Lock()
defer p.Unlock()

Expand Down Expand Up @@ -256,7 +256,7 @@ func (p *FlowClientPool) Close() {
// NewFlowClientPool returns a new FlowClientPool using the websocket connections
// to maintain the pool of client up to date according to the websocket connections
// status.
func NewFlowClientPool(pool shttp.WSSpeakerPool) *FlowClientPool {
func NewFlowClientPool(pool ws.WSSpeakerPool) *FlowClientPool {
p := &FlowClientPool{
flowClients: make([]*FlowClient, 0),
}
Expand Down
7 changes: 4 additions & 3 deletions analyzer/flow_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/skydive-project/skydive/logging"
"github.com/skydive-project/skydive/probe"
"github.com/skydive-project/skydive/topology/graph"
ws "github.com/skydive-project/skydive/websocket"
)

const (
Expand Down Expand Up @@ -76,7 +77,7 @@ type FlowServerUDPConn struct {

// FlowServerWebSocketConn describes a WebSocket flow server connection
type FlowServerWebSocketConn struct {
shttp.DefaultWSSpeakerEventHandler
ws.DefaultWSSpeakerEventHandler
server *shttp.Server
ch chan *flow.Flow
timeOfLastLostFlowsLog time.Time
Expand All @@ -101,7 +102,7 @@ type FlowServer struct {
}

// OnMessage event
func (c *FlowServerWebSocketConn) OnMessage(client shttp.WSSpeaker, m shttp.WSMessage) {
func (c *FlowServerWebSocketConn) OnMessage(client ws.WSSpeaker, m ws.WSMessage) {
f, err := flow.FromData(m.Bytes(client.GetClientProtocol()))
if err != nil {
logging.GetLogger().Errorf("Error while parsing flow: %s", err.Error())
Expand All @@ -124,7 +125,7 @@ func (c *FlowServerWebSocketConn) OnMessage(client shttp.WSSpeaker, m shttp.WSMe
// Serve starts a WebSocket flow server
func (c *FlowServerWebSocketConn) Serve(ch chan *flow.Flow, quit chan struct{}, wg *sync.WaitGroup) {
c.ch = ch
server := shttp.NewWSServer(c.server, "/ws/flow", c.auth)
server := ws.NewWSServer(c.server, "/ws/flow", c.auth)
server.AddEventHandler(c)
go func() {
server.Start()
Expand Down
Loading

0 comments on commit 5325336

Please sign in to comment.