From 5325336452b96fe9e2523eacef9db1a7332dc6ff Mon Sep 17 00:00:00 2001 From: Sylvain Baubeau Date: Tue, 28 Aug 2018 17:14:52 +0200 Subject: [PATCH] websocket: extract WebSocket code into a separate package --- Makefile | 4 +- agent/agent.go | 23 ++-- agent/topology_forwarder.go | 30 +++--- alert/server.go | 8 +- analyzer/flow_client.go | 16 +-- analyzer/flow_server.go | 7 +- analyzer/server.go | 51 ++++++--- analyzer/topology_agent_endpoint.go | 12 +-- analyzer/topology_publisher_endpoint.go | 13 +-- analyzer/topology_replication_endpoint.go | 55 +++++----- api/types/types.go | 105 ++++++++----------- cmd/client/status.go | 4 +- flow/client.go | 10 +- flow/ondemand/client/client.go | 22 ++-- flow/ondemand/server/server.go | 10 +- flow/server.go | 8 +- http/auth.go | 2 + http/client.go | 2 +- http/server.go | 8 -- http/tls.go | 2 +- packet_injector/client.go | 14 +-- packet_injector/server.go | 14 +-- tests/helper/helper.go | 10 +- tests/scale_test.go | 6 +- tests/topology_test.go | 15 +-- topology/graph/message.go | 4 +- topology/topology_subscriber_endpoint.go | 36 +++---- {http => websocket}/master.go | 2 +- {http => websocket}/pool.go | 6 +- {http => websocket}/wsclient.go | 121 +++++++++++----------- {http => websocket}/wsmessage.go | 21 ++-- {http => websocket}/wsmessage_test.go | 7 +- {http => websocket}/wsserver.go | 16 ++- {http => websocket}/wsserver_test.go | 33 +++--- {http => websocket}/wsstructmessage.proto | 2 +- 35 files changed, 358 insertions(+), 341 deletions(-) rename {http => websocket}/master.go (99%) rename {http => websocket}/pool.go (98%) rename {http => websocket}/wsclient.go (92%) rename {http => websocket}/wsmessage.go (97%) rename {http => websocket}/wsmessage_test.go (96%) rename {http => websocket}/wsserver.go (89%) rename {http => websocket}/wsserver_test.go (83%) rename {http => websocket}/wsstructmessage.proto (98%) diff --git a/Makefile b/Makefile index c3e26939e8..0108021b59 100644 --- a/Makefile +++ b/Makefile @@ -205,7 +205,7 @@ flow/flow.pb.go: flow/flow.proto sed -e 's/type Flow struct {/type Flow struct { XXX_state flowState `json:"-"`/' -i $@ gofmt -s -w $@ -.proto: govendor flow/flow.pb.go filters/filters.pb.go http/wsstructmessage.pb.go +.proto: govendor flow/flow.pb.go filters/filters.pb.go websocket/wsstructmessage.pb.go .PHONY: .proto.clean .proto.clean: @@ -525,7 +525,7 @@ docker-cross-build: ebpf.build SKYDIVE_PROTO_FILES:= \ flow/flow.proto \ filters/filters.proto \ - http/wsstructmessage.proto + websocket/wsstructmessage.proto SKYDIVE_TAR_INPUT:= \ vendor \ diff --git a/agent/agent.go b/agent/agent.go index 50ee9f339f..86890ccdce 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -43,14 +43,15 @@ import ( "github.com/skydive-project/skydive/topology" "github.com/skydive-project/skydive/topology/graph" "github.com/skydive-project/skydive/topology/graph/traversal" + ws "github.com/skydive-project/skydive/websocket" ) // Agent object started on each hosts/namespaces type Agent struct { - shttp.DefaultWSSpeakerEventHandler + ws.DefaultWSSpeakerEventHandler graph *graph.Graph - wsServer *shttp.WSStructServer - analyzerClientPool *shttp.WSStructClientPool + wsServer *ws.WSStructServer + analyzerClientPool *ws.WSStructClientPool topologyEndpoint *topology.TopologySubscriberEndpoint rootNode *graph.Node topologyProbeBundle *probe.ProbeBundle @@ -66,8 +67,8 @@ type Agent struct { // NewAnalyzerWSStructClientPool creates a new http WebSocket client Pool // with authentification -func NewAnalyzerWSStructClientPool(authOptions *shttp.AuthenticationOpts) (*shttp.WSStructClientPool, error) { - pool := shttp.NewWSStructClientPool("AnalyzerClientPool") +func NewAnalyzerWSStructClientPool(authOptions *shttp.AuthenticationOpts) (*ws.WSStructClientPool, error) { + pool := ws.NewWSStructClientPool("AnalyzerClientPool") addresses, err := config.GetAnalyzerServiceAddresses() if err != nil { @@ -80,7 +81,7 @@ func NewAnalyzerWSStructClientPool(authOptions *shttp.AuthenticationOpts) (*shtt } for _, sa := range addresses { - c := shttp.NewWSClientFromConfig(common.AgentService, config.GetURL("ws", sa.Addr, sa.Port, "/ws/agent"), authOptions, nil) + c := ws.NewWSClientFromConfig(common.AgentService, config.GetURL("ws", sa.Addr, sa.Port, "/ws/agent"), authOptions, nil) pool.AddClient(c) } @@ -89,13 +90,13 @@ func NewAnalyzerWSStructClientPool(authOptions *shttp.AuthenticationOpts) (*shtt // AnalyzerConnStatus represents the status of a connection to an analyzer type AnalyzerConnStatus struct { - shttp.WSConnStatus + ws.ConnStatus IsMaster bool } // AgentStatus represents the status of an agent type AgentStatus struct { - Clients map[string]shttp.WSConnStatus + Clients map[string]ws.ConnStatus Analyzers map[string]AnalyzerConnStatus TopologyProbes []string FlowProbes []string @@ -112,8 +113,8 @@ func (a *Agent) GetStatus() interface{} { analyzers := make(map[string]AnalyzerConnStatus) for id, status := range a.analyzerClientPool.GetStatus() { analyzers[id] = AnalyzerConnStatus{ - WSConnStatus: status, - IsMaster: status.Addr == masterAddr && status.Port == masterPort, + ConnStatus: status, + IsMaster: status.Addr == masterAddr && status.Port == masterPort, } } @@ -192,7 +193,7 @@ func NewAgent() (*Agent, error) { return nil, err } - wsServer := shttp.NewWSStructServer(shttp.NewWSServer(hserver, "/ws/subscriber", apiAuthBackend)) + wsServer := ws.NewWSStructServer(ws.NewWSServer(hserver, "/ws/subscriber", apiAuthBackend)) // declare all extension available throught API and filtering tr := traversal.NewGremlinTraversalParser() diff --git a/agent/topology_forwarder.go b/agent/topology_forwarder.go index 436e3ebb19..5bc70417c3 100644 --- a/agent/topology_forwarder.go +++ b/agent/topology_forwarder.go @@ -24,16 +24,16 @@ package agent import ( "github.com/skydive-project/skydive/config" - shttp "github.com/skydive-project/skydive/http" "github.com/skydive-project/skydive/logging" "github.com/skydive-project/skydive/topology/graph" + ws "github.com/skydive-project/skydive/websocket" ) // TopologyForwarder forwards the topology to only one master server. // When switching from one analyzer to another one the agent does a full // re-sync since some messages could have been lost. type TopologyForwarder struct { - masterElection *shttp.WSMasterElection + masterElection *ws.WSMasterElection graph *graph.Graph host string } @@ -45,15 +45,15 @@ func (t *TopologyForwarder) triggerResync() { defer t.graph.RUnlock() // request for deletion of everything belonging this host - t.masterElection.SendMessageToMaster(shttp.NewWSStructMessage(graph.Namespace, graph.HostGraphDeletedMsgType, t.host)) + t.masterElection.SendMessageToMaster(ws.NewWSStructMessage(graph.Namespace, graph.HostGraphDeletedMsgType, t.host)) // re-add all the nodes and edges - t.masterElection.SendMessageToMaster(shttp.NewWSStructMessage(graph.Namespace, graph.SyncMsgType, t.graph)) + t.masterElection.SendMessageToMaster(ws.NewWSStructMessage(graph.Namespace, graph.SyncMsgType, t.graph)) } // OnNewMaster is called by the master election mechanism when a new master is elected. In // such case a "Re-sync" is triggerd in order to be in sync with the new master. -func (t *TopologyForwarder) OnNewMaster(c shttp.WSSpeaker) { +func (t *TopologyForwarder) OnNewMaster(c ws.WSSpeaker) { if c == nil { logging.GetLogger().Warn("Lost connection to master") } else { @@ -65,43 +65,43 @@ func (t *TopologyForwarder) OnNewMaster(c shttp.WSSpeaker) { // OnNodeUpdated graph node updated event. Implements the GraphEventListener interface. func (t *TopologyForwarder) OnNodeUpdated(n *graph.Node) { - t.masterElection.SendMessageToMaster(shttp.NewWSStructMessage(graph.Namespace, graph.NodeUpdatedMsgType, n)) + t.masterElection.SendMessageToMaster(ws.NewWSStructMessage(graph.Namespace, graph.NodeUpdatedMsgType, n)) } // OnNodeAdded graph node added event. Implements the GraphEventListener interface. func (t *TopologyForwarder) OnNodeAdded(n *graph.Node) { - t.masterElection.SendMessageToMaster(shttp.NewWSStructMessage(graph.Namespace, graph.NodeAddedMsgType, n)) + t.masterElection.SendMessageToMaster(ws.NewWSStructMessage(graph.Namespace, graph.NodeAddedMsgType, n)) } // OnNodeDeleted graph node deleted event. Implements the GraphEventListener interface. func (t *TopologyForwarder) OnNodeDeleted(n *graph.Node) { - t.masterElection.SendMessageToMaster(shttp.NewWSStructMessage(graph.Namespace, graph.NodeDeletedMsgType, n)) + t.masterElection.SendMessageToMaster(ws.NewWSStructMessage(graph.Namespace, graph.NodeDeletedMsgType, n)) } // OnEdgeUpdated graph edge updated event. Implements the GraphEventListener interface. func (t *TopologyForwarder) OnEdgeUpdated(e *graph.Edge) { - t.masterElection.SendMessageToMaster(shttp.NewWSStructMessage(graph.Namespace, graph.EdgeUpdatedMsgType, e)) + t.masterElection.SendMessageToMaster(ws.NewWSStructMessage(graph.Namespace, graph.EdgeUpdatedMsgType, e)) } // OnEdgeAdded graph edge added event. Implements the GraphEventListener interface. func (t *TopologyForwarder) OnEdgeAdded(e *graph.Edge) { - t.masterElection.SendMessageToMaster(shttp.NewWSStructMessage(graph.Namespace, graph.EdgeAddedMsgType, e)) + t.masterElection.SendMessageToMaster(ws.NewWSStructMessage(graph.Namespace, graph.EdgeAddedMsgType, e)) } // OnEdgeDeleted graph edge deleted event. Implements the GraphEventListener interface. func (t *TopologyForwarder) OnEdgeDeleted(e *graph.Edge) { - t.masterElection.SendMessageToMaster(shttp.NewWSStructMessage(graph.Namespace, graph.EdgeDeletedMsgType, e)) + t.masterElection.SendMessageToMaster(ws.NewWSStructMessage(graph.Namespace, graph.EdgeDeletedMsgType, e)) } // GetMaster returns the current analyzer the agent is sending its events to -func (t *TopologyForwarder) GetMaster() shttp.WSSpeaker { +func (t *TopologyForwarder) GetMaster() ws.WSSpeaker { return t.masterElection.GetMaster() } // NewTopologyForwarder returns a new Graph forwarder which forwards event of the given graph // to the given WebSocket JSON speakers. -func NewTopologyForwarder(host string, g *graph.Graph, pool shttp.WSStructSpeakerPool) *TopologyForwarder { - masterElection := shttp.NewWSMasterElection(pool) +func NewTopologyForwarder(host string, g *graph.Graph, pool ws.WSStructSpeakerPool) *TopologyForwarder { + masterElection := ws.NewWSMasterElection(pool) t := &TopologyForwarder{ masterElection: masterElection, @@ -116,7 +116,7 @@ func NewTopologyForwarder(host string, g *graph.Graph, pool shttp.WSStructSpeake } // NewTopologyForwarderFromConfig creates a TopologyForwarder from configuration -func NewTopologyForwarderFromConfig(g *graph.Graph, pool shttp.WSStructSpeakerPool) *TopologyForwarder { +func NewTopologyForwarderFromConfig(g *graph.Graph, pool ws.WSStructSpeakerPool) *TopologyForwarder { host := config.GetString("host_id") return NewTopologyForwarder(host, g, pool) } diff --git a/alert/server.go b/alert/server.go index 507fe27379..13c843ec03 100644 --- a/alert/server.go +++ b/alert/server.go @@ -37,11 +37,11 @@ import ( "github.com/skydive-project/skydive/api/types" "github.com/skydive-project/skydive/common" "github.com/skydive-project/skydive/etcd" - shttp "github.com/skydive-project/skydive/http" "github.com/skydive-project/skydive/js" "github.com/skydive-project/skydive/logging" "github.com/skydive-project/skydive/topology/graph" "github.com/skydive-project/skydive/topology/graph/traversal" + ws "github.com/skydive-project/skydive/websocket" ) const ( @@ -194,7 +194,7 @@ type Server struct { common.RWMutex *etcd.MasterElector Graph *graph.Graph - Pool shttp.WSStructSpeakerPool + Pool ws.WSStructSpeakerPool AlertHandler api.Handler apiServer *api.Server watcher api.StoppableWatcher @@ -232,7 +232,7 @@ func (a *Server) triggerAlert(al *GremlinAlert, data interface{}) error { } }() - wsMsg := shttp.NewWSStructMessage(Namespace, "Alert", msg) + wsMsg := ws.NewWSStructMessage(Namespace, "Alert", msg) a.Pool.BroadcastMessage(wsMsg) logging.GetLogger().Debugf("Alert %s of type %s was triggerred", al.UUID, al.Action) @@ -404,7 +404,7 @@ func (a *Server) Stop() { } // Returns a new alerting server -func NewServer(apiServer *api.Server, pool shttp.WSStructSpeakerPool, graph *graph.Graph, parser *traversal.GremlinTraversalParser, etcdClient *etcd.Client) (*Server, error) { +func NewServer(apiServer *api.Server, pool ws.WSStructSpeakerPool, graph *graph.Graph, parser *traversal.GremlinTraversalParser, etcdClient *etcd.Client) (*Server, error) { elector := etcd.NewMasterElectorFromConfig(common.AnalyzerService, "alert-server", etcdClient) jsre, err := js.NewJSRE() diff --git a/analyzer/flow_client.go b/analyzer/flow_client.go index 3fa8a6a32a..ef8defee1f 100644 --- a/analyzer/flow_client.go +++ b/analyzer/flow_client.go @@ -33,14 +33,14 @@ import ( "github.com/skydive-project/skydive/common" "github.com/skydive-project/skydive/config" "github.com/skydive-project/skydive/flow" - shttp "github.com/skydive-project/skydive/http" "github.com/skydive-project/skydive/logging" + ws "github.com/skydive-project/skydive/websocket" ) // FlowClientPool describes a flow client pool. type FlowClientPool struct { common.RWMutex - shttp.DefaultWSSpeakerEventHandler + ws.DefaultWSSpeakerEventHandler flowClients []*FlowClient } @@ -66,9 +66,9 @@ type FlowClientUDPConn struct { // FlowClientWebSocketConn describes WebSocket client connection type FlowClientWebSocketConn struct { - shttp.DefaultWSSpeakerEventHandler + ws.DefaultWSSpeakerEventHandler url *url.URL - wsClient *shttp.WSClient + wsClient *ws.WSClient } // Close the connection @@ -108,7 +108,7 @@ func (c *FlowClientWebSocketConn) Close() error { // Connect to the WebSocket flow server func (c *FlowClientWebSocketConn) Connect() error { authOptions := AnalyzerClusterAuthenticationOpts() - c.wsClient = shttp.NewWSClientFromConfig(common.AgentService, c.url, authOptions, nil) + c.wsClient = ws.NewWSClientFromConfig(common.AgentService, c.url, authOptions, nil) c.wsClient.Connect() c.wsClient.AddEventHandler(c) @@ -195,7 +195,7 @@ func NewFlowClient(addr string, port int) (*FlowClient, error) { } // OnConnected websocket event handler -func (p *FlowClientPool) OnConnected(c shttp.WSSpeaker) { +func (p *FlowClientPool) OnConnected(c ws.WSSpeaker) { p.Lock() defer p.Unlock() @@ -219,7 +219,7 @@ func (p *FlowClientPool) OnConnected(c shttp.WSSpeaker) { } // OnDisconnected websocket event handler -func (p *FlowClientPool) OnDisconnected(c shttp.WSSpeaker) { +func (p *FlowClientPool) OnDisconnected(c ws.WSSpeaker) { p.Lock() defer p.Unlock() @@ -256,7 +256,7 @@ func (p *FlowClientPool) Close() { // NewFlowClientPool returns a new FlowClientPool using the websocket connections // to maintain the pool of client up to date according to the websocket connections // status. -func NewFlowClientPool(pool shttp.WSSpeakerPool) *FlowClientPool { +func NewFlowClientPool(pool ws.WSSpeakerPool) *FlowClientPool { p := &FlowClientPool{ flowClients: make([]*FlowClient, 0), } diff --git a/analyzer/flow_server.go b/analyzer/flow_server.go index 6c03e5840e..a850ed670b 100644 --- a/analyzer/flow_server.go +++ b/analyzer/flow_server.go @@ -41,6 +41,7 @@ import ( "github.com/skydive-project/skydive/logging" "github.com/skydive-project/skydive/probe" "github.com/skydive-project/skydive/topology/graph" + ws "github.com/skydive-project/skydive/websocket" ) const ( @@ -76,7 +77,7 @@ type FlowServerUDPConn struct { // FlowServerWebSocketConn describes a WebSocket flow server connection type FlowServerWebSocketConn struct { - shttp.DefaultWSSpeakerEventHandler + ws.DefaultWSSpeakerEventHandler server *shttp.Server ch chan *flow.Flow timeOfLastLostFlowsLog time.Time @@ -101,7 +102,7 @@ type FlowServer struct { } // OnMessage event -func (c *FlowServerWebSocketConn) OnMessage(client shttp.WSSpeaker, m shttp.WSMessage) { +func (c *FlowServerWebSocketConn) OnMessage(client ws.WSSpeaker, m ws.WSMessage) { f, err := flow.FromData(m.Bytes(client.GetClientProtocol())) if err != nil { logging.GetLogger().Errorf("Error while parsing flow: %s", err.Error()) @@ -124,7 +125,7 @@ func (c *FlowServerWebSocketConn) OnMessage(client shttp.WSSpeaker, m shttp.WSMe // Serve starts a WebSocket flow server func (c *FlowServerWebSocketConn) Serve(ch chan *flow.Flow, quit chan struct{}, wg *sync.WaitGroup) { c.ch = ch - server := shttp.NewWSServer(c.server, "/ws/flow", c.auth) + server := ws.NewWSServer(c.server, "/ws/flow", c.auth) server.AddEventHandler(c) go func() { server.Start() diff --git a/analyzer/server.go b/analyzer/server.go index 1df0796ad9..d5625d9203 100644 --- a/analyzer/server.go +++ b/analyzer/server.go @@ -48,15 +48,38 @@ import ( "github.com/skydive-project/skydive/topology/enhancers" "github.com/skydive-project/skydive/topology/graph" "github.com/skydive-project/skydive/topology/graph/traversal" + ws "github.com/skydive-project/skydive/websocket" ) +// ElectionStatus describes the status of an election +type ElectionStatus struct { + IsMaster bool +} + +// PeersStatus describes the state of a peer +type PeersStatus struct { + Incomers map[string]ws.ConnStatus + Outgoers map[string]ws.ConnStatus +} + +// Status describes the status of an analyzer +type Status struct { + Agents map[string]ws.ConnStatus + Peers PeersStatus + Publishers map[string]ws.ConnStatus + Subscribers map[string]ws.ConnStatus + Alerts ElectionStatus + Captures ElectionStatus + Probes []string +} + // Server describes an Analyzer servers mechanism like http, websocket, topology, ondemand probes, ... type Server struct { httpServer *shttp.Server - agentWSServer *shttp.WSStructServer - publisherWSServer *shttp.WSStructServer - replicationWSServer *shttp.WSStructServer - subscriberWSServer *shttp.WSStructServer + agentWSServer *ws.WSStructServer + publisherWSServer *ws.WSStructServer + replicationWSServer *ws.WSStructServer + subscriberWSServer *ws.WSStructServer replicationEndpoint *TopologyReplicationEndpoint alertServer *alert.Server onDemandClient *ondemand.OnDemandProbeClient @@ -73,9 +96,9 @@ type Server struct { // GetStatus returns the status of an analyzer func (s *Server) GetStatus() interface{} { - peersStatus := types.PeersStatus{ - Incomers: make(map[string]shttp.WSConnStatus), - Outgoers: make(map[string]shttp.WSConnStatus), + peersStatus := PeersStatus{ + Incomers: make(map[string]ws.ConnStatus), + Outgoers: make(map[string]ws.ConnStatus), } for _, speaker := range s.replicationEndpoint.in.GetSpeakers() { @@ -86,13 +109,13 @@ func (s *Server) GetStatus() interface{} { peersStatus.Outgoers[speaker.GetRemoteHost()] = speaker.GetStatus() } - return &types.AnalyzerStatus{ + return &Status{ Agents: s.agentWSServer.GetStatus(), Peers: peersStatus, Publishers: s.publisherWSServer.GetStatus(), Subscribers: s.subscriberWSServer.GetStatus(), - Alerts: types.ElectionStatus{IsMaster: s.alertServer.IsMaster()}, - Captures: types.ElectionStatus{IsMaster: s.onDemandClient.IsMaster()}, + Alerts: ElectionStatus{IsMaster: s.alertServer.IsMaster()}, + Captures: ElectionStatus{IsMaster: s.onDemandClient.IsMaster()}, Probes: s.probeBundle.ActiveProbes(), } } @@ -251,13 +274,13 @@ func NewServerFromConfig() (*Server, error) { hserver.RegisterLoginRoute(apiAuthBackend) - agentWSServer := shttp.NewWSStructServer(shttp.NewWSServer(hserver, "/ws/agent", clusterAuthBackend)) + agentWSServer := ws.NewWSStructServer(ws.NewWSServer(hserver, "/ws/agent", clusterAuthBackend)) _, err = NewTopologyAgentEndpoint(agentWSServer, cached, g) if err != nil { return nil, err } - publisherWSServer := shttp.NewWSStructServer(shttp.NewWSServer(hserver, "/ws/publisher", apiAuthBackend)) + publisherWSServer := ws.NewWSStructServer(ws.NewWSServer(hserver, "/ws/publisher", apiAuthBackend)) _, err = NewTopologyPublisherEndpoint(publisherWSServer, g) if err != nil { return nil, err @@ -267,7 +290,7 @@ func NewServerFromConfig() (*Server, error) { storage, err := storage.NewStorageFromConfig(etcdClient) - replicationWSServer := shttp.NewWSStructServer(shttp.NewWSServer(hserver, "/ws/replication", clusterAuthBackend)) + replicationWSServer := ws.NewWSStructServer(ws.NewWSServer(hserver, "/ws/replication", clusterAuthBackend)) replicationEndpoint, err := NewTopologyReplicationEndpoint(replicationWSServer, clusterAuthOptions, cached, g) if err != nil { return nil, err @@ -281,7 +304,7 @@ func NewServerFromConfig() (*Server, error) { tr.AddTraversalExtension(ge.NewSocketsTraversalExtension()) tr.AddTraversalExtension(ge.NewDescendantsTraversalExtension()) - subscriberWSServer := shttp.NewWSStructServer(shttp.NewWSServer(hserver, "/ws/subscriber", apiAuthBackend)) + subscriberWSServer := ws.NewWSStructServer(ws.NewWSServer(hserver, "/ws/subscriber", apiAuthBackend)) topology.NewTopologySubscriberEndpoint(subscriberWSServer, g, tr) probeBundle, err := NewTopologyProbeBundleFromConfig(g) diff --git a/analyzer/topology_agent_endpoint.go b/analyzer/topology_agent_endpoint.go index 22f2f0d4b3..b7c7423df7 100644 --- a/analyzer/topology_agent_endpoint.go +++ b/analyzer/topology_agent_endpoint.go @@ -26,23 +26,23 @@ import ( "sync" "github.com/skydive-project/skydive/common" - shttp "github.com/skydive-project/skydive/http" "github.com/skydive-project/skydive/logging" "github.com/skydive-project/skydive/topology/graph" + ws "github.com/skydive-project/skydive/websocket" ) // TopologyAgentEndpoint serves the graph for agents. type TopologyAgentEndpoint struct { common.RWMutex - shttp.DefaultWSSpeakerEventHandler - pool shttp.WSStructSpeakerPool + ws.DefaultWSSpeakerEventHandler + pool ws.WSStructSpeakerPool Graph *graph.Graph cached *graph.CachedBackend wg sync.WaitGroup } // OnDisconnected called when an agent disconnected. -func (t *TopologyAgentEndpoint) OnDisconnected(c shttp.WSSpeaker) { +func (t *TopologyAgentEndpoint) OnDisconnected(c ws.WSSpeaker) { host := c.GetRemoteHost() if host == "" { return @@ -55,7 +55,7 @@ func (t *TopologyAgentEndpoint) OnDisconnected(c shttp.WSSpeaker) { } // OnWSStructMessage is triggered when a message from the agent is received. -func (t *TopologyAgentEndpoint) OnWSStructMessage(c shttp.WSSpeaker, msg *shttp.WSStructMessage) { +func (t *TopologyAgentEndpoint) OnWSStructMessage(c ws.WSSpeaker, msg *ws.WSStructMessage) { msgType, obj, err := graph.UnmarshalWSMessage(msg) if err != nil { logging.GetLogger().Errorf("Graph: Unable to parse the event %v: %s", msg, err) @@ -100,7 +100,7 @@ func (t *TopologyAgentEndpoint) OnWSStructMessage(c shttp.WSSpeaker, msg *shttp. } // NewTopologyAgentEndpoint returns a new server that handles messages from the agents -func NewTopologyAgentEndpoint(pool shttp.WSStructSpeakerPool, cached *graph.CachedBackend, g *graph.Graph) (*TopologyAgentEndpoint, error) { +func NewTopologyAgentEndpoint(pool ws.WSStructSpeakerPool, cached *graph.CachedBackend, g *graph.Graph) (*TopologyAgentEndpoint, error) { t := &TopologyAgentEndpoint{ Graph: g, pool: pool, diff --git a/analyzer/topology_publisher_endpoint.go b/analyzer/topology_publisher_endpoint.go index 67071a6751..df4700dcbc 100644 --- a/analyzer/topology_publisher_endpoint.go +++ b/analyzer/topology_publisher_endpoint.go @@ -27,11 +27,12 @@ import ( "sync" "github.com/skydive-project/skydive/common" - shttp "github.com/skydive-project/skydive/http" "github.com/skydive-project/skydive/logging" "github.com/skydive-project/skydive/statics" "github.com/skydive-project/skydive/topology/graph" "github.com/skydive-project/skydive/topology/graph/traversal" + ws "github.com/skydive-project/skydive/websocket" + "github.com/xeipuuv/gojsonschema" ) @@ -49,8 +50,8 @@ const ( // an external program that interacts with the Skydive graph. type TopologyPublisherEndpoint struct { common.RWMutex - shttp.DefaultWSSpeakerEventHandler - pool shttp.WSStructSpeakerPool + ws.DefaultWSSpeakerEventHandler + pool ws.WSStructSpeakerPool Graph *graph.Graph cached *graph.CachedBackend nodeSchema gojsonschema.JSONLoader @@ -60,7 +61,7 @@ type TopologyPublisherEndpoint struct { } // OnDisconnected called when a publisher got disconnected. -func (t *TopologyPublisherEndpoint) OnDisconnected(c shttp.WSSpeaker) { +func (t *TopologyPublisherEndpoint) OnDisconnected(c ws.WSSpeaker) { policy := PersistencePolicy(c.GetHeaders().Get("X-Persistence-Policy")) if policy == Persistent { return @@ -76,7 +77,7 @@ func (t *TopologyPublisherEndpoint) OnDisconnected(c shttp.WSSpeaker) { } // OnWSStructMessage is triggered by message coming from a publisher. -func (t *TopologyPublisherEndpoint) OnWSStructMessage(c shttp.WSSpeaker, msg *shttp.WSStructMessage) { +func (t *TopologyPublisherEndpoint) OnWSStructMessage(c ws.WSSpeaker, msg *ws.WSStructMessage) { msgType, obj, err := graph.UnmarshalWSMessage(msg) if err != nil { logging.GetLogger().Errorf("Graph: Unable to parse the event %v: %s", msg, err) @@ -146,7 +147,7 @@ func (t *TopologyPublisherEndpoint) OnWSStructMessage(c shttp.WSSpeaker, msg *sh } // NewTopologyPublisherEndpoint returns a new server for external publishers. -func NewTopologyPublisherEndpoint(pool shttp.WSStructSpeakerPool, g *graph.Graph) (*TopologyPublisherEndpoint, error) { +func NewTopologyPublisherEndpoint(pool ws.WSStructSpeakerPool, g *graph.Graph) (*TopologyPublisherEndpoint, error) { nodeSchema, err := statics.Asset("statics/schemas/node.schema") if err != nil { return nil, err diff --git a/analyzer/topology_replication_endpoint.go b/analyzer/topology_replication_endpoint.go index 489167d428..39e6c4de4d 100644 --- a/analyzer/topology_replication_endpoint.go +++ b/analyzer/topology_replication_endpoint.go @@ -34,17 +34,18 @@ import ( shttp "github.com/skydive-project/skydive/http" "github.com/skydive-project/skydive/logging" "github.com/skydive-project/skydive/topology/graph" + ws "github.com/skydive-project/skydive/websocket" ) // TopologyReplicatorPeer is a remote connection to another Graph server. Only modification // of the local Graph made either by the local server, by an agent message or by an external // client will be forwarded to the peer. type TopologyReplicatorPeer struct { - shttp.DefaultWSSpeakerEventHandler + ws.DefaultWSSpeakerEventHandler URL *url.URL Graph *graph.Graph AuthOptions *shttp.AuthenticationOpts - wsspeaker shttp.WSSpeaker + wsspeaker ws.WSSpeaker endpoint *TopologyReplicationEndpoint host string ephemeral bool @@ -53,10 +54,10 @@ type TopologyReplicatorPeer struct { // TopologyReplicationEndpoint serves the local Graph and send local modification to its peers. type TopologyReplicationEndpoint struct { common.RWMutex - shttp.DefaultWSSpeakerEventHandler - in shttp.WSStructSpeakerPool - out *shttp.WSStructClientPool - inByHost map[string]shttp.WSSpeaker + ws.DefaultWSSpeakerEventHandler + in ws.WSStructSpeakerPool + out *ws.WSStructClientPool + inByHost map[string]ws.WSSpeaker outByHost map[string]*url.URL candidates []*TopologyReplicatorPeer Graph *graph.Graph @@ -71,7 +72,7 @@ func (t *TopologyReplicationEndpoint) debug() bool { // OnConnected is called when the peer gets connected then the whole graph // is send to initialize it. -func (p *TopologyReplicatorPeer) OnConnected(c shttp.WSSpeaker) { +func (p *TopologyReplicatorPeer) OnConnected(c ws.WSSpeaker) { p.Graph.RLock() defer p.Graph.RUnlock() @@ -93,14 +94,14 @@ func (p *TopologyReplicatorPeer) OnConnected(c shttp.WSSpeaker) { return } - p.wsspeaker.SendMessage(shttp.NewWSStructMessage(graph.Namespace, graph.SyncMsgType, p.Graph)) + p.wsspeaker.SendMessage(ws.NewWSStructMessage(graph.Namespace, graph.SyncMsgType, p.Graph)) p.endpoint.out.AddClient(c) p.endpoint.outByHost[host] = c.GetURL() } // OnDisconnected is called when the peer gets disconnected -func (p *TopologyReplicatorPeer) OnDisconnected(c shttp.WSSpeaker) { +func (p *TopologyReplicatorPeer) OnDisconnected(c ws.WSSpeaker) { p.endpoint.Lock() defer p.endpoint.Unlock() @@ -111,7 +112,7 @@ func (p *TopologyReplicatorPeer) connect(wg *sync.WaitGroup) { defer wg.Done() logging.GetLogger().Infof("Connecting to peer: %s", p.URL.String()) - wsClient := shttp.NewWSClientFromConfig(common.AnalyzerService, p.URL, p.AuthOptions, http.Header{}).UpgradeToWSStructSpeaker() + wsClient := ws.NewWSClientFromConfig(common.AnalyzerService, p.URL, p.AuthOptions, http.Header{}).UpgradeToWSStructSpeaker() // will trigger shttp.WSSpeakerEventHandler, so OnConnected wsClient.AddEventHandler(p) @@ -170,7 +171,7 @@ func (t *TopologyReplicationEndpoint) DisconnectPeers() { } // OnWSStructMessage is triggered by message coming from an other peer. -func (t *TopologyReplicationEndpoint) OnWSStructMessage(c shttp.WSSpeaker, msg *shttp.WSStructMessage) { +func (t *TopologyReplicationEndpoint) OnWSStructMessage(c ws.WSSpeaker, msg *ws.WSStructMessage) { host := c.GetRemoteHost() if host == config.GetString("host_id") { logging.GetLogger().Debugf("Ignore message from myself(%s), %s", c.GetURL().String()) @@ -230,9 +231,9 @@ func (t *TopologyReplicationEndpoint) OnWSStructMessage(c shttp.WSSpeaker, msg * } // SendToPeers sends the message to all the peers -func (t *TopologyReplicationEndpoint) notifyPeers(msg *shttp.WSStructMessage) { +func (t *TopologyReplicationEndpoint) notifyPeers(msg *ws.WSStructMessage) { if t.debug() { - logging.GetLogger().Debugf("Broadcasting message to all peers: (protobuf) %s", msg.Bytes(shttp.ProtobufProtocol)) + logging.GetLogger().Debugf("Broadcasting message to all peers: (protobuf) %s", msg.Bytes(ws.ProtobufProtocol)) } t.in.BroadcastMessage(msg) t.out.BroadcastMessage(msg) @@ -241,7 +242,7 @@ func (t *TopologyReplicationEndpoint) notifyPeers(msg *shttp.WSStructMessage) { // OnNodeUpdated graph node updated event. Implements the GraphEventListener interface. func (t *TopologyReplicationEndpoint) OnNodeUpdated(n *graph.Node) { if t.replicateMsg.Load() == true { - msg := shttp.NewWSStructMessage(graph.Namespace, graph.NodeUpdatedMsgType, n) + msg := ws.NewWSStructMessage(graph.Namespace, graph.NodeUpdatedMsgType, n) t.notifyPeers(msg) } } @@ -249,7 +250,7 @@ func (t *TopologyReplicationEndpoint) OnNodeUpdated(n *graph.Node) { // OnNodeAdded graph node added event. Implements the GraphEventListener interface. func (t *TopologyReplicationEndpoint) OnNodeAdded(n *graph.Node) { if t.replicateMsg.Load() == true { - msg := shttp.NewWSStructMessage(graph.Namespace, graph.NodeAddedMsgType, n) + msg := ws.NewWSStructMessage(graph.Namespace, graph.NodeAddedMsgType, n) t.notifyPeers(msg) } } @@ -257,7 +258,7 @@ func (t *TopologyReplicationEndpoint) OnNodeAdded(n *graph.Node) { // OnNodeDeleted graph node deleted event. Implements the GraphEventListener interface. func (t *TopologyReplicationEndpoint) OnNodeDeleted(n *graph.Node) { if t.replicateMsg.Load() == true { - msg := shttp.NewWSStructMessage(graph.Namespace, graph.NodeDeletedMsgType, n) + msg := ws.NewWSStructMessage(graph.Namespace, graph.NodeDeletedMsgType, n) t.notifyPeers(msg) } } @@ -265,7 +266,7 @@ func (t *TopologyReplicationEndpoint) OnNodeDeleted(n *graph.Node) { // OnEdgeUpdated graph edge updated event. Implements the GraphEventListener interface. func (t *TopologyReplicationEndpoint) OnEdgeUpdated(e *graph.Edge) { if t.replicateMsg.Load() == true { - msg := shttp.NewWSStructMessage(graph.Namespace, graph.EdgeUpdatedMsgType, e) + msg := ws.NewWSStructMessage(graph.Namespace, graph.EdgeUpdatedMsgType, e) t.notifyPeers(msg) } } @@ -273,7 +274,7 @@ func (t *TopologyReplicationEndpoint) OnEdgeUpdated(e *graph.Edge) { // OnEdgeAdded graph edge added event. Implements the GraphEventListener interface. func (t *TopologyReplicationEndpoint) OnEdgeAdded(e *graph.Edge) { if t.replicateMsg.Load() == true { - msg := shttp.NewWSStructMessage(graph.Namespace, graph.EdgeAddedMsgType, e) + msg := ws.NewWSStructMessage(graph.Namespace, graph.EdgeAddedMsgType, e) t.notifyPeers(msg) } } @@ -281,18 +282,18 @@ func (t *TopologyReplicationEndpoint) OnEdgeAdded(e *graph.Edge) { // OnEdgeDeleted graph edge deleted event. Implements the GraphEventListener interface. func (t *TopologyReplicationEndpoint) OnEdgeDeleted(e *graph.Edge) { if t.replicateMsg.Load() == true { - msg := shttp.NewWSStructMessage(graph.Namespace, graph.EdgeDeletedMsgType, e) + msg := ws.NewWSStructMessage(graph.Namespace, graph.EdgeDeletedMsgType, e) t.notifyPeers(msg) } } // GetSpeakers return both incoming and outgoing speakers -func (t *TopologyReplicationEndpoint) GetSpeakers() []shttp.WSSpeaker { +func (t *TopologyReplicationEndpoint) GetSpeakers() []ws.WSSpeaker { return append(t.in.GetSpeakers(), t.out.GetSpeakers()...) } // OnConnected is called when an incoming peer got connected. -func (t *TopologyReplicationEndpoint) OnConnected(c shttp.WSSpeaker) { +func (t *TopologyReplicationEndpoint) OnConnected(c ws.WSSpeaker) { t.Graph.RLock() defer t.Graph.RUnlock() @@ -307,12 +308,12 @@ func (t *TopologyReplicationEndpoint) OnConnected(c shttp.WSSpeaker) { } // subscribe to websocket structured messages - c.(*shttp.WSStructSpeaker).AddStructMessageHandler(t, []string{graph.Namespace}) - c.SendMessage(shttp.NewWSStructMessage(graph.Namespace, graph.SyncMsgType, t.Graph)) + c.(*ws.WSStructSpeaker).AddStructMessageHandler(t, []string{graph.Namespace}) + c.SendMessage(ws.NewWSStructMessage(graph.Namespace, graph.SyncMsgType, t.Graph)) } // OnDisconnected is called when an incoming peer got disconnected. -func (t *TopologyReplicationEndpoint) OnDisconnected(c shttp.WSSpeaker) { +func (t *TopologyReplicationEndpoint) OnDisconnected(c ws.WSSpeaker) { t.Lock() defer t.Unlock() @@ -320,7 +321,7 @@ func (t *TopologyReplicationEndpoint) OnDisconnected(c shttp.WSSpeaker) { } // NewTopologyReplicationEndpoint returns a new server to be used by other analyzers for replication. -func NewTopologyReplicationEndpoint(pool shttp.WSStructSpeakerPool, auth *shttp.AuthenticationOpts, cached *graph.CachedBackend, g *graph.Graph) (*TopologyReplicationEndpoint, error) { +func NewTopologyReplicationEndpoint(pool ws.WSStructSpeakerPool, auth *shttp.AuthenticationOpts, cached *graph.CachedBackend, g *graph.Graph) (*TopologyReplicationEndpoint, error) { addresses, err := config.GetAnalyzerServiceAddresses() if err != nil { return nil, fmt.Errorf("Unable to get the analyzers list: %s", err) @@ -330,8 +331,8 @@ func NewTopologyReplicationEndpoint(pool shttp.WSStructSpeakerPool, auth *shttp. Graph: g, cached: cached, in: pool, - out: shttp.NewWSStructClientPool("TopologyReplicationEndpoint"), - inByHost: make(map[string]shttp.WSSpeaker), + out: ws.NewWSStructClientPool("TopologyReplicationEndpoint"), + inByHost: make(map[string]ws.WSSpeaker), outByHost: make(map[string]*url.URL), } t.replicateMsg.Store(true) diff --git a/api/types/types.go b/api/types/types.go index de17c07631..b7932a641f 100644 --- a/api/types/types.go +++ b/api/types/types.go @@ -26,7 +26,6 @@ import ( "errors" "time" - shttp "github.com/skydive-project/skydive/http" "github.com/skydive-project/skydive/topology/graph" ) @@ -69,17 +68,6 @@ func NewAlert() *Alert { } } -// AnalyzerStatus describes the status of an analyzer -type AnalyzerStatus struct { - Agents map[string]shttp.WSConnStatus - Peers PeersStatus - Publishers map[string]shttp.WSConnStatus - Subscribers map[string]shttp.WSConnStatus - Alerts ElectionStatus - Captures ElectionStatus - Probes []string -} - // Capture describes a capture API type Capture struct { BasicResource @@ -107,9 +95,47 @@ func NewCapture(query string, bpfFilter string) *Capture { } } -// ElectionStatus describes the status of an election -type ElectionStatus struct { - IsMaster bool +// EdgeRule describes a edge rule +type EdgeRule struct { + UUID string + Name string + Description string + Src string `valid:"isGremlinExpr"` + Dst string `valid:"isGremlinExpr"` + RelationType string `valid:"regexp=^(layer2|ownership|both)$"` + Metadata graph.Metadata +} + +// ID returns the edge rule ID +func (e *EdgeRule) ID() string { + return e.UUID +} + +// SetID set ID +func (e *EdgeRule) SetID(id string) { + e.UUID = id +} + +// NodeRule describes a node rule +type NodeRule struct { + UUID string + Name string + Description string + NodeType string + NodeName string + Metadata graph.Metadata + Action string `valid:"regexp=^(create|update)$"` + Query string `valid:"isGremlinOrEmpty"` +} + +// ID returns the node rule ID +func (n *NodeRule) ID() string { + return n.UUID +} + +// SetID set ID +func (n *NodeRule) SetID(id string) { + n.UUID = id } // PacketInjection packet injector API parameters @@ -142,12 +168,6 @@ func (pi *PacketInjection) Validate() error { return nil } -// PeersStatus describes the state of a peer -type PeersStatus struct { - Incomers map[string]shttp.WSConnStatus - Outgoers map[string]shttp.WSConnStatus -} - // TopologyParam topology API parameter type TopologyParam struct { GremlinQuery string `json:"GremlinQuery,omitempty" valid:"isGremlinExpr"` @@ -191,46 +211,3 @@ type Workflow struct { Parameters []WorkflowParam `yaml:"parameters"` Source string `valid:"isValidWorkflow" yaml:"source"` } - -// NodeRule describes a node rule -type NodeRule struct { - UUID string - Name string - Description string - NodeType string - NodeName string - Metadata graph.Metadata - Action string `valid:"regexp=^(create|update)$"` - Query string `valid:"isGremlinOrEmpty"` -} - -// ID returns the node rule ID -func (n *NodeRule) ID() string { - return n.UUID -} - -// SetID set ID -func (n *NodeRule) SetID(id string) { - n.UUID = id -} - -// EdgeRule describes a edge rule -type EdgeRule struct { - UUID string - Name string - Description string - Src string `valid:"isGremlinExpr"` - Dst string `valid:"isGremlinExpr"` - RelationType string `valid:"regexp=^(layer2|ownership|both)$"` - Metadata graph.Metadata -} - -// ID returns the edge rule ID -func (e *EdgeRule) ID() string { - return e.UUID -} - -// SetID set ID -func (e *EdgeRule) SetID(id string) { - e.UUID = id -} diff --git a/cmd/client/status.go b/cmd/client/status.go index 33708b302d..1341df552f 100644 --- a/cmd/client/status.go +++ b/cmd/client/status.go @@ -27,8 +27,8 @@ import ( "net/http" "os" + "github.com/skydive-project/skydive/analyzer" "github.com/skydive-project/skydive/api/client" - "github.com/skydive-project/skydive/api/types" "github.com/skydive-project/skydive/common" "github.com/skydive-project/skydive/logging" @@ -59,7 +59,7 @@ var StatusCmd = &cobra.Command{ os.Exit(1) } - var status types.AnalyzerStatus + var status analyzer.Status if err := common.JSONDecode(resp.Body, &status); err != nil { logging.GetLogger().Error(err.Error()) os.Exit(1) diff --git a/flow/client.go b/flow/client.go index f9ea4b42de..3d91bc93c4 100644 --- a/flow/client.go +++ b/flow/client.go @@ -27,22 +27,22 @@ import ( "github.com/skydive-project/skydive/common" "github.com/skydive-project/skydive/filters" - shttp "github.com/skydive-project/skydive/http" "github.com/skydive-project/skydive/logging" "github.com/skydive-project/skydive/topology" + ws "github.com/skydive-project/skydive/websocket" ) // TableClient describes a mechanism to Query a flow table via flowSet in JSON type TableClient struct { - WSStructServer *shttp.WSStructServer + WSStructServer *ws.WSStructServer } func (f *TableClient) lookupFlows(flowset chan *FlowSet, host string, flowSearchQuery filters.SearchQuery) { obj, _ := proto.Marshal(&flowSearchQuery) tq := TableQuery{Type: "SearchQuery", Obj: obj} - msg := shttp.NewWSStructMessage(Namespace, "TableQuery", tq) + msg := ws.NewWSStructMessage(Namespace, "TableQuery", tq) - resp, err := f.WSStructServer.Request(host, msg, shttp.DefaultRequestTimeout) + resp, err := f.WSStructServer.Request(host, msg, ws.DefaultRequestTimeout) if err != nil { logging.GetLogger().Errorf("Unable to send message to agent %s: %s", host, err.Error()) flowset <- NewFlowSet() @@ -133,6 +133,6 @@ func (f *TableClient) LookupFlowsByNodes(hnmap topology.HostNodeTIDMap, flowSear } // NewTableClient creates a new table client based on websocket -func NewTableClient(w *shttp.WSStructServer) *TableClient { +func NewTableClient(w *ws.WSStructServer) *TableClient { return &TableClient{WSStructServer: w} } diff --git a/flow/ondemand/client/client.go b/flow/ondemand/client/client.go index 06e204b337..e7b433ceab 100644 --- a/flow/ondemand/client/client.go +++ b/flow/ondemand/client/client.go @@ -34,9 +34,9 @@ import ( "github.com/skydive-project/skydive/etcd" "github.com/skydive-project/skydive/flow/ondemand" ge "github.com/skydive-project/skydive/gremlin/traversal" - shttp "github.com/skydive-project/skydive/http" "github.com/skydive-project/skydive/logging" "github.com/skydive-project/skydive/topology/graph" + ws "github.com/skydive-project/skydive/websocket" ) // OnDemandProbeClient describes an ondemand probe client based on a websocket @@ -46,8 +46,8 @@ type OnDemandProbeClient struct { graph.DefaultGraphListener graph *graph.Graph captureHandler *api.CaptureAPIHandler - agentPool shttp.WSStructSpeakerPool - subscriberPool shttp.WSStructSpeakerPool + agentPool ws.WSStructSpeakerPool + subscriberPool ws.WSStructSpeakerPool captures map[string]*types.Capture watcher api.StoppableWatcher registeredNodes map[string]string @@ -62,7 +62,7 @@ type nodeProbe struct { } // OnWSStructMessage event, valid message type : CaptureStartReply or CaptureStopReply message -func (o *OnDemandProbeClient) OnWSStructMessage(c shttp.WSSpeaker, m *shttp.WSStructMessage) { +func (o *OnDemandProbeClient) OnWSStructMessage(c ws.WSSpeaker, m *ws.WSStructMessage) { var query ondemand.CaptureQuery if err := m.UnmarshalObj(&query); err != nil { logging.GetLogger().Errorf("Unable to decode capture %v", m) @@ -80,7 +80,7 @@ func (o *OnDemandProbeClient) OnWSStructMessage(c shttp.WSSpeaker, m *shttp.WSSt } else { logging.GetLogger().Debugf("Capture start request succeeded %v", m.Debug()) } - o.subscriberPool.BroadcastMessage(shttp.NewWSStructMessage(ondemand.NotificationNamespace, "CaptureNodeUpdated", query.Capture.UUID)) + o.subscriberPool.BroadcastMessage(ws.NewWSStructMessage(ondemand.NotificationNamespace, "CaptureNodeUpdated", query.Capture.UUID)) case "CaptureStopReply": if m.Status == http.StatusOK { logging.GetLogger().Debugf("Capture stop request succeeded %v", m.Debug()) @@ -156,7 +156,7 @@ func (o *OnDemandProbeClient) registerProbe(np nodeProbe) bool { Capture: *np.capture, } - msg := shttp.NewWSStructMessage(ondemand.Namespace, "CaptureStart", cq) + msg := ws.NewWSStructMessage(ondemand.Namespace, "CaptureStart", cq) if err := o.agentPool.SendMessageTo(msg, np.host); err != nil { logging.GetLogger().Errorf("Unable to send message to agent %s: %s", np.host, err.Error()) @@ -175,7 +175,7 @@ func (o *OnDemandProbeClient) unregisterProbe(node *graph.Node, capture *types.C Capture: *capture, } - msg := shttp.NewWSStructMessage(ondemand.Namespace, "CaptureStop", cq) + msg := ws.NewWSStructMessage(ondemand.Namespace, "CaptureStop", cq) if _, err := node.GetFieldString("Capture.ID"); err != nil { return false @@ -253,7 +253,7 @@ func (o *OnDemandProbeClient) OnNodeUpdated(n *graph.Node) { func (o *OnDemandProbeClient) OnNodeDeleted(n *graph.Node) { o.RLock() if uuid, ok := o.registeredNodes[string(n.ID)]; ok { - o.subscriberPool.BroadcastMessage(shttp.NewWSStructMessage(ondemand.NotificationNamespace, "CaptureNodeUpdated", uuid)) + o.subscriberPool.BroadcastMessage(ws.NewWSStructMessage(ondemand.NotificationNamespace, "CaptureNodeUpdated", uuid)) } o.RUnlock() } @@ -356,10 +356,10 @@ func (o *OnDemandProbeClient) onAPIWatcherEvent(action string, id string, resour capture := resource.(*types.Capture) switch action { case "init", "create", "set", "update": - o.subscriberPool.BroadcastMessage(shttp.NewWSStructMessage(ondemand.NotificationNamespace, "CaptureAdded", capture)) + o.subscriberPool.BroadcastMessage(ws.NewWSStructMessage(ondemand.NotificationNamespace, "CaptureAdded", capture)) o.onCaptureAdded(capture) case "expire", "delete": - o.subscriberPool.BroadcastMessage(shttp.NewWSStructMessage(ondemand.NotificationNamespace, "CaptureDeleted", capture)) + o.subscriberPool.BroadcastMessage(ws.NewWSStructMessage(ondemand.NotificationNamespace, "CaptureDeleted", capture)) o.onCaptureDeleted(capture) } } @@ -382,7 +382,7 @@ func (o *OnDemandProbeClient) Stop() { } // NewOnDemandProbeClient creates a new ondemand probe client based on Capture API, graph and websocket -func NewOnDemandProbeClient(g *graph.Graph, ch *api.CaptureAPIHandler, agentPool shttp.WSStructSpeakerPool, subscriberPool shttp.WSStructSpeakerPool, etcdClient *etcd.Client) *OnDemandProbeClient { +func NewOnDemandProbeClient(g *graph.Graph, ch *api.CaptureAPIHandler, agentPool ws.WSStructSpeakerPool, subscriberPool ws.WSStructSpeakerPool, etcdClient *etcd.Client) *OnDemandProbeClient { resources := ch.Index() captures := make(map[string]*types.Capture) for _, resource := range resources { diff --git a/flow/ondemand/server/server.go b/flow/ondemand/server/server.go index 3f41bcd434..2975d0997e 100644 --- a/flow/ondemand/server/server.go +++ b/flow/ondemand/server/server.go @@ -30,10 +30,10 @@ import ( "github.com/skydive-project/skydive/common" "github.com/skydive-project/skydive/flow/ondemand" "github.com/skydive-project/skydive/flow/probes" - shttp "github.com/skydive-project/skydive/http" "github.com/skydive-project/skydive/logging" "github.com/skydive-project/skydive/probe" "github.com/skydive-project/skydive/topology/graph" + ws "github.com/skydive-project/skydive/websocket" ) type activeProbe struct { @@ -47,10 +47,10 @@ type activeProbe struct { type OnDemandProbeServer struct { common.RWMutex graph.DefaultGraphListener - shttp.DefaultWSSpeakerEventHandler + ws.DefaultWSSpeakerEventHandler Graph *graph.Graph Probes *probe.ProbeBundle - WSStructClientPool *shttp.WSStructClientPool + WSStructClientPool *ws.WSStructClientPool activeProbes map[graph.Identifier]*activeProbe } @@ -160,7 +160,7 @@ func (p *activeProbe) OnStopped() { } // OnWSStructMessage websocket message, valid message type are CaptureStart, CaptureStop -func (o *OnDemandProbeServer) OnWSStructMessage(c shttp.WSSpeaker, msg *shttp.WSStructMessage) { +func (o *OnDemandProbeServer) OnWSStructMessage(c ws.WSSpeaker, msg *ws.WSStructMessage) { var query ondemand.CaptureQuery if err := msg.UnmarshalObj(&query); err != nil { logging.GetLogger().Errorf("Unable to decode capture %v", msg) @@ -247,7 +247,7 @@ func (o *OnDemandProbeServer) Stop() { } // NewOnDemandProbeServer creates a new Ondemand probes server based on graph and websocket -func NewOnDemandProbeServer(fb *probe.ProbeBundle, g *graph.Graph, pool *shttp.WSStructClientPool) (*OnDemandProbeServer, error) { +func NewOnDemandProbeServer(fb *probe.ProbeBundle, g *graph.Graph, pool *ws.WSStructClientPool) (*OnDemandProbeServer, error) { return &OnDemandProbeServer{ Graph: g, Probes: fb, diff --git a/flow/server.go b/flow/server.go index 5fc92d86fc..3c9a71899d 100644 --- a/flow/server.go +++ b/flow/server.go @@ -23,8 +23,8 @@ package flow import ( - shttp "github.com/skydive-project/skydive/http" "github.com/skydive-project/skydive/logging" + ws "github.com/skydive-project/skydive/websocket" ) // Namespace "Flow" @@ -38,7 +38,7 @@ type TableServer struct { } // OnTableQuery event -func (s *TableServer) OnTableQuery(c shttp.WSSpeaker, msg *shttp.WSStructMessage) { +func (s *TableServer) OnTableQuery(c ws.WSSpeaker, msg *ws.WSStructMessage) { var query TableQuery if err := msg.UnmarshalObj(&query); err != nil { logging.GetLogger().Errorf("Unable to decode search flow message %v", msg) @@ -51,7 +51,7 @@ func (s *TableServer) OnTableQuery(c shttp.WSSpeaker, msg *shttp.WSStructMessage } // OnWSStructMessage TableQuery -func (s *TableServer) OnWSStructMessage(c shttp.WSSpeaker, msg *shttp.WSStructMessage) { +func (s *TableServer) OnWSStructMessage(c ws.WSSpeaker, msg *ws.WSStructMessage) { switch msg.Type { case "TableQuery": s.OnTableQuery(c, msg) @@ -59,7 +59,7 @@ func (s *TableServer) OnWSStructMessage(c shttp.WSSpeaker, msg *shttp.WSStructMe } // NewServer creates a new flow table query server based on websocket -func NewServer(allocator *TableAllocator, pool shttp.WSStructSpeakerPool) *TableServer { +func NewServer(allocator *TableAllocator, pool ws.WSStructSpeakerPool) *TableServer { s := &TableServer{ TableAllocator: allocator, } diff --git a/http/auth.go b/http/auth.go index 6048505439..f1bc376d23 100644 --- a/http/auth.go +++ b/http/auth.go @@ -48,6 +48,8 @@ const ( tokenName = "authtok" ) +// AuthenticationOpts describes the elements used by a client to authenticate +// to an HTTP server. It can be either a username/password couple or a token type AuthenticationOpts struct { Username string Password string diff --git a/http/client.go b/http/client.go index 301dfb2152..725ccd9fa2 100644 --- a/http/client.go +++ b/http/client.go @@ -59,7 +59,7 @@ func readBody(resp *http.Response) string { func getHttpClient() (*http.Client, error) { client := &http.Client{} if config.IsTLSenabled() { - tlsConfig, err := getTLSConfig(true) + tlsConfig, err := GetTLSConfig(true) if err != nil { return nil, err } diff --git a/http/server.go b/http/server.go index 25fd36884c..4f99b02d37 100644 --- a/http/server.go +++ b/http/server.go @@ -100,14 +100,6 @@ func copyRequestVars(old, new *http.Request) { } } -func getRequestParameter(r *http.Request, name string) string { - param := r.Header.Get(name) - if param == "" { - param = r.URL.Query().Get(strings.ToLower(name)) - } - return param -} - func (s *Server) RegisterRoutes(routes []Route, auth AuthenticationBackend) { for _, route := range routes { r := s.Router. diff --git a/http/tls.go b/http/tls.go index a49f443aab..efb898b1ae 100644 --- a/http/tls.go +++ b/http/tls.go @@ -37,7 +37,7 @@ func setTLSHeader(w http.ResponseWriter, r *http.Request) { } } -func getTLSConfig(setupRootCA bool) (*tls.Config, error) { +func GetTLSConfig(setupRootCA bool) (*tls.Config, error) { certPEM := config.GetString("agent.X509_cert") keyPEM := config.GetString("agent.X509_key") var tlsConfig *tls.Config diff --git a/packet_injector/client.go b/packet_injector/client.go index fe9f0e26aa..826118019a 100644 --- a/packet_injector/client.go +++ b/packet_injector/client.go @@ -35,10 +35,10 @@ import ( "github.com/skydive-project/skydive/common" "github.com/skydive-project/skydive/etcd" ge "github.com/skydive-project/skydive/gremlin/traversal" - shttp "github.com/skydive-project/skydive/http" "github.com/skydive-project/skydive/logging" "github.com/skydive-project/skydive/topology/graph" "github.com/skydive-project/skydive/validator" + ws "github.com/skydive-project/skydive/websocket" ) const ( @@ -55,7 +55,7 @@ type PacketInjectorReply struct { // PacketInjectorClient describes a packet injector client type PacketInjectorClient struct { *etcd.MasterElector - pool shttp.WSStructSpeakerPool + pool ws.WSStructSpeakerPool watcher apiServer.StoppableWatcher graph *graph.Graph piHandler *apiServer.PacketInjectorAPI @@ -63,9 +63,9 @@ type PacketInjectorClient struct { // StopInjection cancels a running packet injection func (pc *PacketInjectorClient) StopInjection(host string, uuid string) error { - msg := shttp.NewWSStructMessage(Namespace, "PIStopRequest", uuid) + msg := ws.NewWSStructMessage(Namespace, "PIStopRequest", uuid) - resp, err := pc.pool.Request(host, msg, shttp.DefaultRequestTimeout) + resp, err := pc.pool.Request(host, msg, ws.DefaultRequestTimeout) if err != nil { return fmt.Errorf("Unable to send message to agent %s: %s", host, err.Error()) } @@ -85,9 +85,9 @@ func (pc *PacketInjectorClient) StopInjection(host string, uuid string) error { // InjectPackets issues a packet injection request and returns the expected // tracking id func (pc *PacketInjectorClient) InjectPackets(host string, pp *PacketInjectionParams) (string, error) { - msg := shttp.NewWSStructMessage(Namespace, "PIRequest", pp) + msg := ws.NewWSStructMessage(Namespace, "PIRequest", pp) - resp, err := pc.pool.Request(host, msg, shttp.DefaultRequestTimeout) + resp, err := pc.pool.Request(host, msg, ws.DefaultRequestTimeout) if err != nil { return "", fmt.Errorf("Unable to send message to agent %s: %s", host, err.Error()) } @@ -314,7 +314,7 @@ func (pc *PacketInjectorClient) setTimeouts() { } // NewPacketInjectorClient returns a new packet injector client -func NewPacketInjectorClient(pool shttp.WSStructSpeakerPool, etcdClient *etcd.Client, piHandler *apiServer.PacketInjectorAPI, g *graph.Graph) *PacketInjectorClient { +func NewPacketInjectorClient(pool ws.WSStructSpeakerPool, etcdClient *etcd.Client, piHandler *apiServer.PacketInjectorAPI, g *graph.Graph) *PacketInjectorClient { elector := etcd.NewMasterElectorFromConfig(common.AnalyzerService, "pi-client", etcdClient) pic := &PacketInjectorClient{ diff --git a/packet_injector/server.go b/packet_injector/server.go index eb5efe3667..e9993749a8 100644 --- a/packet_injector/server.go +++ b/packet_injector/server.go @@ -26,9 +26,9 @@ import ( "fmt" "net/http" - shttp "github.com/skydive-project/skydive/http" "github.com/skydive-project/skydive/logging" "github.com/skydive-project/skydive/topology/graph" + ws "github.com/skydive-project/skydive/websocket" ) const ( @@ -42,7 +42,7 @@ type PacketInjectorServer struct { Channels *channels } -func (pis *PacketInjectorServer) stopPI(msg *shttp.WSStructMessage) error { +func (pis *PacketInjectorServer) stopPI(msg *ws.WSStructMessage) error { var uuid string if err := msg.DecodeObj(&uuid); err != nil { return err @@ -57,7 +57,7 @@ func (pis *PacketInjectorServer) stopPI(msg *shttp.WSStructMessage) error { return fmt.Errorf("No PI running on this ID: %s", uuid) } -func (pis *PacketInjectorServer) injectPacket(msg *shttp.WSStructMessage) (string, error) { +func (pis *PacketInjectorServer) injectPacket(msg *ws.WSStructMessage) (string, error) { var params PacketInjectionParams if err := msg.DecodeObj(¶ms); err != nil { return "", fmt.Errorf("Unable to decode packet inject param message %v", msg) @@ -72,10 +72,10 @@ func (pis *PacketInjectorServer) injectPacket(msg *shttp.WSStructMessage) (strin } // OnWSMessage event, websocket PIRequest message -func (pis *PacketInjectorServer) OnWSStructMessage(c shttp.WSSpeaker, msg *shttp.WSStructMessage) { +func (pis *PacketInjectorServer) OnWSStructMessage(c ws.WSSpeaker, msg *ws.WSStructMessage) { switch msg.Type { case "PIRequest": - var reply *shttp.WSStructMessage + var reply *ws.WSStructMessage trackingID, err := pis.injectPacket(msg) replyObj := &PacketInjectorReply{TrackingID: trackingID} if err != nil { @@ -89,7 +89,7 @@ func (pis *PacketInjectorServer) OnWSStructMessage(c shttp.WSSpeaker, msg *shttp c.SendMessage(reply) case "PIStopRequest": - var reply *shttp.WSStructMessage + var reply *ws.WSStructMessage err := pis.stopPI(msg) replyObj := &PacketInjectorReply{} if err != nil { @@ -103,7 +103,7 @@ func (pis *PacketInjectorServer) OnWSStructMessage(c shttp.WSSpeaker, msg *shttp } // NewServer creates a new packet injector server API based on websocket server -func NewServer(graph *graph.Graph, pool shttp.WSStructSpeakerPool) *PacketInjectorServer { +func NewServer(graph *graph.Graph, pool ws.WSStructSpeakerPool) *PacketInjectorServer { s := &PacketInjectorServer{ Graph: graph, Channels: &channels{Pipes: make(map[string](chan bool))}, diff --git a/tests/helper/helper.go b/tests/helper/helper.go index e299bec1ed..594c720de7 100644 --- a/tests/helper/helper.go +++ b/tests/helper/helper.go @@ -44,8 +44,8 @@ import ( "github.com/skydive-project/skydive/common" "github.com/skydive-project/skydive/config" "github.com/skydive-project/skydive/flow" - shttp "github.com/skydive-project/skydive/http" "github.com/skydive-project/skydive/logging" + ws "github.com/skydive-project/skydive/websocket" ) type Cmd struct { @@ -246,13 +246,13 @@ func WSClose(ws *websocket.Conn) error { return ws.Close() } -func DecodeWSStructMessageJSON(b []byte) *shttp.WSStructMessage { - mJSON := shttp.WSStructMessageJSON{} +func DecodeWSStructMessageJSON(b []byte) *ws.WSStructMessage { + mJSON := ws.WSStructMessageJSON{} if err := json.Unmarshal(b, &mJSON); err != nil { return nil } - msg := &shttp.WSStructMessage{ - Protocol: shttp.JsonProtocol, + msg := &ws.WSStructMessage{ + Protocol: ws.JSONProtocol, Namespace: mJSON.Namespace, Type: mJSON.Type, UUID: mJSON.UUID, diff --git a/tests/scale_test.go b/tests/scale_test.go index 3a1972cb15..59da2645f0 100644 --- a/tests/scale_test.go +++ b/tests/scale_test.go @@ -34,6 +34,7 @@ import ( "time" "github.com/davecgh/go-spew/spew" + "github.com/skydive-project/skydive/analyzer" gclient "github.com/skydive-project/skydive/api/client" "github.com/skydive-project/skydive/api/types" "github.com/skydive-project/skydive/common" @@ -42,9 +43,10 @@ import ( g "github.com/skydive-project/skydive/gremlin" shttp "github.com/skydive-project/skydive/http" "github.com/skydive-project/skydive/tests/helper" + "github.com/skydive-project/skydive/websocket" ) -func getAnalyzerStatus(client *shttp.CrudClient) (status types.AnalyzerStatus, err error) { +func getAnalyzerStatus(client *shttp.CrudClient) (status analyzer.Status, err error) { resp, err := client.Request("GET", "status", nil, nil) if err != nil { return status, err @@ -96,7 +98,7 @@ func checkHostNodes(client *shttp.CrudClient, gh *gclient.GremlinQueryHelper, no return common.Retry(retry, 10, 5*time.Second) } -func checkPeers(client *shttp.CrudClient, peersExpected int, state shttp.WSConnState) error { +func checkPeers(client *shttp.CrudClient, peersExpected int, state websocket.ConnState) error { status, err := getAnalyzerStatus(client) if err != nil { return err diff --git a/tests/topology_test.go b/tests/topology_test.go index fda734af3a..cf93fa0b56 100644 --- a/tests/topology_test.go +++ b/tests/topology_test.go @@ -40,6 +40,7 @@ import ( "github.com/skydive-project/skydive/tests/helper" "github.com/skydive-project/skydive/topology" "github.com/skydive-project/skydive/topology/graph" + ws "github.com/skydive-project/skydive/websocket" ) func TestBridgeOVS(t *testing.T) { @@ -571,11 +572,11 @@ func TestOVSOwnershipLink(t *testing.T) { } type TopologyInjecter struct { - shttp.DefaultWSSpeakerEventHandler + ws.DefaultWSSpeakerEventHandler connected int32 } -func (t *TopologyInjecter) OnConnected(c shttp.WSSpeaker) { +func (t *TopologyInjecter) OnConnected(c ws.WSSpeaker) { atomic.StoreInt32(&t.connected, 1) } @@ -589,13 +590,13 @@ func TestQueryMetadata(t *testing.T) { } hostname, _ := os.Hostname() - wspool := shttp.NewWSStructClientPool("TestQueryMetadata") + wspool := ws.NewWSStructClientPool("TestQueryMetadata") for _, sa := range addresses { - client := shttp.NewWSClient(hostname+"-cli", common.UnknownService, config.GetURL("ws", sa.Addr, sa.Port, "/ws/publisher"), authOptions, http.Header{}, 1000) + client := ws.NewWSClient(hostname+"-cli", common.UnknownService, config.GetURL("ws", sa.Addr, sa.Port, "/ws/publisher"), authOptions, http.Header{}, 1000) wspool.AddClient(client) } - masterElection := shttp.NewWSMasterElection(wspool) + masterElection := ws.NewWSMasterElection(wspool) eventHandler := &TopologyInjecter{} wspool.AddEventHandler(eventHandler) @@ -633,12 +634,12 @@ func TestQueryMetadata(t *testing.T) { n.Decode(m) // The first message should be rejected as it has no 'Type' attribute - msg := shttp.NewWSStructMessage(graph.Namespace, graph.NodeAddedMsgType, n) + msg := ws.NewWSStructMessage(graph.Namespace, graph.NodeAddedMsgType, n) masterElection.SendMessageToMaster(msg) m["Metadata"].(map[string]interface{})["Type"] = "external" n.Decode(m) - msg = shttp.NewWSStructMessage(graph.Namespace, graph.NodeAddedMsgType, n) + msg = ws.NewWSStructMessage(graph.Namespace, graph.NodeAddedMsgType, n) masterElection.SendMessageToMaster(msg) return nil diff --git a/topology/graph/message.go b/topology/graph/message.go index bcd2571bce..22d5a23376 100644 --- a/topology/graph/message.go +++ b/topology/graph/message.go @@ -27,7 +27,7 @@ import ( "errors" "github.com/skydive-project/skydive/common" - shttp "github.com/skydive-project/skydive/http" + ws "github.com/skydive-project/skydive/websocket" ) // Graph message type @@ -63,7 +63,7 @@ type SyncMsg struct { } // UnmarshalWSMessage deserialize the websocket message -func UnmarshalWSMessage(msg *shttp.WSStructMessage) (string, interface{}, error) { +func UnmarshalWSMessage(msg *ws.WSStructMessage) (string, interface{}, error) { var obj interface{} if err := msg.DecodeObj(&obj); err != nil { return "", msg, err diff --git a/topology/topology_subscriber_endpoint.go b/topology/topology_subscriber_endpoint.go index 5a2126b3de..0c79426d11 100644 --- a/topology/topology_subscriber_endpoint.go +++ b/topology/topology_subscriber_endpoint.go @@ -29,10 +29,10 @@ import ( "sync" "github.com/skydive-project/skydive/common" - shttp "github.com/skydive-project/skydive/http" "github.com/skydive-project/skydive/logging" "github.com/skydive-project/skydive/topology/graph" "github.com/skydive-project/skydive/topology/graph/traversal" + ws "github.com/skydive-project/skydive/websocket" ) type topologySubscriber struct { @@ -44,8 +44,8 @@ type topologySubscriber struct { // TopologySubscriberEndpoint sends all the modifications to its subscribers. type TopologySubscriberEndpoint struct { common.RWMutex - shttp.DefaultWSSpeakerEventHandler - pool shttp.WSStructSpeakerPool + ws.DefaultWSSpeakerEventHandler + pool ws.WSStructSpeakerPool Graph *graph.Graph wg sync.WaitGroup gremlinParser *traversal.GremlinTraversalParser @@ -81,7 +81,7 @@ func (t *TopologySubscriberEndpoint) newTopologySubscriber(host string, gremlinF } // OnConnected called when a subscriber got connected. -func (t *TopologySubscriberEndpoint) OnConnected(c shttp.WSSpeaker) { +func (t *TopologySubscriberEndpoint) OnConnected(c ws.WSSpeaker) { gremlinFilter := c.GetHeaders().Get("X-Gremlin-Filter") if gremlinFilter == "" { gremlinFilter = c.GetURL().Query().Get("x-gremlin-filter") @@ -102,7 +102,7 @@ func (t *TopologySubscriberEndpoint) OnConnected(c shttp.WSSpeaker) { } // OnDisconnected called when a subscriber got disconnected. -func (t *TopologySubscriberEndpoint) OnDisconnected(c shttp.WSSpeaker) { +func (t *TopologySubscriberEndpoint) OnDisconnected(c ws.WSSpeaker) { t.Lock() delete(t.subscribers, c.GetRemoteHost()) t.Unlock() @@ -110,7 +110,7 @@ func (t *TopologySubscriberEndpoint) OnDisconnected(c shttp.WSSpeaker) { // OnWSStructMessage is triggered when receiving a message from a subscriber. // It only responds to SyncRequestMsgType messages -func (t *TopologySubscriberEndpoint) OnWSStructMessage(c shttp.WSSpeaker, msg *shttp.WSStructMessage) { +func (t *TopologySubscriberEndpoint) OnWSStructMessage(c ws.WSSpeaker, msg *ws.WSStructMessage) { msgType, obj, err := graph.UnmarshalWSMessage(msg) if err != nil { logging.GetLogger().Errorf("Graph: Unable to parse the event %v: %s", msg, err) @@ -156,7 +156,7 @@ func (t *TopologySubscriberEndpoint) OnWSStructMessage(c shttp.WSSpeaker, msg *s // notifyClients forwards local graph modification to subscribers. If a subscriber // specified a Gremlin filter, a 'Diff' is applied between the previous graph state // for this subscriber and the current graph state. -func (t *TopologySubscriberEndpoint) notifyClients(msg *shttp.WSStructMessage) { +func (t *TopologySubscriberEndpoint) notifyClients(msg *ws.WSStructMessage) { for _, c := range t.pool.GetSpeakers() { t.RLock() subscriber, found := t.subscribers[c.GetRemoteHost()] @@ -172,19 +172,19 @@ func (t *TopologySubscriberEndpoint) notifyClients(msg *shttp.WSStructMessage) { addedNodes, removedNodes, addedEdges, removedEdges := subscriber.graph.Diff(g) for _, n := range addedNodes { - c.SendMessage(shttp.NewWSStructMessage(graph.Namespace, graph.NodeAddedMsgType, n)) + c.SendMessage(ws.NewWSStructMessage(graph.Namespace, graph.NodeAddedMsgType, n)) } for _, n := range removedNodes { - c.SendMessage(shttp.NewWSStructMessage(graph.Namespace, graph.NodeDeletedMsgType, n)) + c.SendMessage(ws.NewWSStructMessage(graph.Namespace, graph.NodeDeletedMsgType, n)) } for _, e := range addedEdges { - c.SendMessage(shttp.NewWSStructMessage(graph.Namespace, graph.EdgeAddedMsgType, e)) + c.SendMessage(ws.NewWSStructMessage(graph.Namespace, graph.EdgeAddedMsgType, e)) } for _, e := range removedEdges { - c.SendMessage(shttp.NewWSStructMessage(graph.Namespace, graph.EdgeDeletedMsgType, e)) + c.SendMessage(ws.NewWSStructMessage(graph.Namespace, graph.EdgeDeletedMsgType, e)) } subscriber.graph = g @@ -196,37 +196,37 @@ func (t *TopologySubscriberEndpoint) notifyClients(msg *shttp.WSStructMessage) { // OnNodeUpdated graph node updated event. Implements the GraphEventListener interface. func (t *TopologySubscriberEndpoint) OnNodeUpdated(n *graph.Node) { - t.notifyClients(shttp.NewWSStructMessage(graph.Namespace, graph.NodeUpdatedMsgType, n)) + t.notifyClients(ws.NewWSStructMessage(graph.Namespace, graph.NodeUpdatedMsgType, n)) } // OnNodeAdded graph node added event. Implements the GraphEventListener interface. func (t *TopologySubscriberEndpoint) OnNodeAdded(n *graph.Node) { - t.notifyClients(shttp.NewWSStructMessage(graph.Namespace, graph.NodeAddedMsgType, n)) + t.notifyClients(ws.NewWSStructMessage(graph.Namespace, graph.NodeAddedMsgType, n)) } // OnNodeDeleted graph node deleted event. Implements the GraphEventListener interface. func (t *TopologySubscriberEndpoint) OnNodeDeleted(n *graph.Node) { - t.notifyClients(shttp.NewWSStructMessage(graph.Namespace, graph.NodeDeletedMsgType, n)) + t.notifyClients(ws.NewWSStructMessage(graph.Namespace, graph.NodeDeletedMsgType, n)) } // OnEdgeUpdated graph edge updated event. Implements the GraphEventListener interface. func (t *TopologySubscriberEndpoint) OnEdgeUpdated(e *graph.Edge) { - t.notifyClients(shttp.NewWSStructMessage(graph.Namespace, graph.EdgeUpdatedMsgType, e)) + t.notifyClients(ws.NewWSStructMessage(graph.Namespace, graph.EdgeUpdatedMsgType, e)) } // OnEdgeAdded graph edge added event. Implements the GraphEventListener interface. func (t *TopologySubscriberEndpoint) OnEdgeAdded(e *graph.Edge) { - t.notifyClients(shttp.NewWSStructMessage(graph.Namespace, graph.EdgeAddedMsgType, e)) + t.notifyClients(ws.NewWSStructMessage(graph.Namespace, graph.EdgeAddedMsgType, e)) } // OnEdgeDeleted graph edge deleted event. Implements the GraphEventListener interface. func (t *TopologySubscriberEndpoint) OnEdgeDeleted(e *graph.Edge) { - t.notifyClients(shttp.NewWSStructMessage(graph.Namespace, graph.EdgeDeletedMsgType, e)) + t.notifyClients(ws.NewWSStructMessage(graph.Namespace, graph.EdgeDeletedMsgType, e)) } // NewTopologySubscriberEndpoint returns a new server to be used by external subscribers, // for instance the WebUI. -func NewTopologySubscriberEndpoint(pool shttp.WSStructSpeakerPool, g *graph.Graph, tr *traversal.GremlinTraversalParser) *TopologySubscriberEndpoint { +func NewTopologySubscriberEndpoint(pool ws.WSStructSpeakerPool, g *graph.Graph, tr *traversal.GremlinTraversalParser) *TopologySubscriberEndpoint { t := &TopologySubscriberEndpoint{ Graph: g, pool: pool, diff --git a/http/master.go b/websocket/master.go similarity index 99% rename from http/master.go rename to websocket/master.go index 20830ce13b..c103d63b2b 100644 --- a/http/master.go +++ b/websocket/master.go @@ -20,7 +20,7 @@ * */ -package http +package websocket import "github.com/skydive-project/skydive/common" diff --git a/http/pool.go b/websocket/pool.go similarity index 98% rename from http/pool.go rename to websocket/pool.go index 76a118bb81..bacc10c44a 100644 --- a/http/pool.go +++ b/websocket/pool.go @@ -20,7 +20,7 @@ * */ -package http +package websocket import ( "math/rand" @@ -150,8 +150,8 @@ func (s *WSPool) RemoveClient(c WSSpeaker) bool { } // GetStatus returns the states of the WebSocket clients -func (s *WSPool) GetStatus() map[string]WSConnStatus { - clients := make(map[string]WSConnStatus) +func (s *WSPool) GetStatus() map[string]ConnStatus { + clients := make(map[string]ConnStatus) for _, client := range s.GetSpeakers() { clients[client.GetRemoteHost()] = client.GetStatus() } diff --git a/http/wsclient.go b/websocket/wsclient.go similarity index 92% rename from http/wsclient.go rename to websocket/wsclient.go index ffec3aeefe..a8359ecd46 100644 --- a/http/wsclient.go +++ b/websocket/wsclient.go @@ -20,12 +20,12 @@ * */ -package http +package websocket import ( "encoding/json" "errors" - "fmt" + fmt "fmt" "net/http" "net/url" "strconv" @@ -38,6 +38,7 @@ import ( "github.com/skydive-project/skydive/common" "github.com/skydive-project/skydive/config" + shttp "github.com/skydive-project/skydive/http" "github.com/skydive-project/skydive/logging" ) @@ -46,58 +47,25 @@ const ( writeWait = 10 * time.Second ) -// WSMessage is the interface of a message to send over the wire -type WSMessage interface { - Bytes(protocol string) []byte -} - -// WSRawMessage represents a raw message (array of bytes) -type WSRawMessage []byte - -// Bytes returns the string representation of the raw message -func (m WSRawMessage) Bytes(protocol string) []byte { - return m -} - -// WSSpeaker is the interface for a websocket speaking client. It is used for outgoing -// or incoming connections. -type WSSpeaker interface { - GetStatus() WSConnStatus - GetHost() string - GetAddrPort() (string, int) - GetServiceType() common.ServiceType - GetClientProtocol() string - GetHeaders() http.Header - GetURL() *url.URL - IsConnected() bool - SendMessage(m WSMessage) error - SendRaw(r []byte) error - Connect() - Disconnect() - AddEventHandler(WSSpeakerEventHandler) - GetRemoteHost() string - GetRemoteServiceType() common.ServiceType -} - -// WSConnState describes the connection state -type WSConnState int32 +// ConnState describes the connection state +type ConnState int32 // WSConnStatus describes the status of a WebSocket connection -type WSConnStatus struct { +type ConnStatus struct { ServiceType common.ServiceType ClientProtocol string Addr string Port int - Host string `json:"-"` - State *WSConnState `json:"IsConnected"` - URL *url.URL `json:"-"` - headers http.Header + Host string `json:"-"` + State *ConnState `json:"IsConnected"` + URL *url.URL `json:"-"` + Headers http.Header `json:"-"` ConnectTime time.Time RemoteHost string `json:",omitempty"` RemoteServiceType common.ServiceType `json:",omitempty"` } -func (s *WSConnState) MarshalJSON() ([]byte, error) { +func (s *ConnState) MarshalJSON() ([]byte, error) { switch *s { case common.RunningState: return []byte("true"), nil @@ -108,7 +76,7 @@ func (s *WSConnState) MarshalJSON() ([]byte, error) { } // UnmarshalJSON deserialize a connection state -func (s *WSConnState) UnmarshalJSON(b []byte) error { +func (s *ConnState) UnmarshalJSON(b []byte) error { var state bool if err := json.Unmarshal(b, &state); err != nil { return err @@ -123,10 +91,43 @@ func (s *WSConnState) UnmarshalJSON(b []byte) error { return nil } +// WSMessage is the interface of a message to send over the wire +type WSMessage interface { + Bytes(protocol string) []byte +} + +// WSRawMessage represents a raw message (array of bytes) +type WSRawMessage []byte + +// Bytes returns the string representation of the raw message +func (m WSRawMessage) Bytes(protocol string) []byte { + return m +} + +// WSSpeaker is the interface for a websocket speaking client. It is used for outgoing +// or incoming connections. +type WSSpeaker interface { + GetStatus() ConnStatus + GetHost() string + GetAddrPort() (string, int) + GetServiceType() common.ServiceType + GetClientProtocol() string + GetHeaders() http.Header + GetURL() *url.URL + IsConnected() bool + SendMessage(m WSMessage) error + SendRaw(r []byte) error + Connect() + Disconnect() + AddEventHandler(WSSpeakerEventHandler) + GetRemoteHost() string + GetRemoteServiceType() common.ServiceType +} + // WSConn is the connection object of a WSSpeaker type WSConn struct { common.RWMutex - WSConnStatus + ConnStatus send chan []byte read chan []byte quit chan bool @@ -148,7 +149,7 @@ type wsIncomingClient struct { type WSClient struct { *WSConn Path string - AuthOpts *AuthenticationOpts + AuthOpts *shttp.AuthenticationOpts } // WSSpeakerEventHandler is the interface to be implement by the client events listeners. @@ -195,14 +196,14 @@ func (c *WSConn) IsConnected() bool { } // GetStatus returns the status of a WebSocket connection -func (c *WSConn) GetStatus() WSConnStatus { +func (c *WSConn) GetStatus() ConnStatus { c.RLock() defer c.RUnlock() - status := c.WSConnStatus - status.State = new(WSConnState) - *status.State = WSConnState(atomic.LoadInt32((*int32)(c.State))) - return c.WSConnStatus + status := c.ConnStatus + status.State = new(ConnState) + *status.State = ConnState(atomic.LoadInt32((*int32)(c.State))) + return c.ConnStatus } // WSSpeakerStructMessageHandler interface used to receive Struct messages. @@ -244,7 +245,7 @@ func (c *WSConn) GetClientProtocol() string { // GetHeaders returns the client HTTP headers. func (c *WSConn) GetHeaders() http.Header { - return c.headers + return c.Headers } // GetRemoteHost returns the hostname/host-id of the remote side of the connection. @@ -383,15 +384,15 @@ func newWSConn(host string, clientType common.ServiceType, clientProtocol string port, _ := strconv.Atoi(url.Port()) c := &WSConn{ - WSConnStatus: WSConnStatus{ + ConnStatus: ConnStatus{ Host: host, ServiceType: clientType, ClientProtocol: clientProtocol, Addr: url.Hostname(), Port: port, - State: new(WSConnState), + State: new(ConnState), URL: url, - headers: headers, + Headers: headers, ConnectTime: time.Now(), }, send: make(chan []byte, queueSize), @@ -423,7 +424,7 @@ func (c *WSClient) connect() { } if c.AuthOpts != nil { - SetAuthHeaders(&headers, c.AuthOpts) + shttp.SetAuthHeaders(&headers, c.AuthOpts) } d := websocket.Dialer{ @@ -431,7 +432,7 @@ func (c *WSClient) connect() { ReadBufferSize: 1024, WriteBufferSize: 1024, } - d.TLSClientConfig, err = getTLSConfig(false) + d.TLSClientConfig, err = shttp.GetTLSConfig(false) if err != nil { logging.GetLogger().Errorf("Unable to create a WebSocket connection %s : %s", endpoint, err) return @@ -495,7 +496,7 @@ func (c *WSClient) Connect() { } // NewWSClient returns a WSClient with a new connection. -func NewWSClient(host string, clientType common.ServiceType, url *url.URL, authOpts *AuthenticationOpts, headers http.Header, queueSize int) *WSClient { +func NewWSClient(host string, clientType common.ServiceType, url *url.URL, authOpts *shttp.AuthenticationOpts, headers http.Header, queueSize int) *WSClient { wsconn := newWSConn(host, clientType, ProtobufProtocol, url, headers, queueSize) c := &WSClient{ WSConn: wsconn, @@ -506,7 +507,7 @@ func NewWSClient(host string, clientType common.ServiceType, url *url.URL, authO } // NewWSClientFromConfig creates a WSClient based on the configuration -func NewWSClientFromConfig(clientType common.ServiceType, url *url.URL, authOpts *AuthenticationOpts, headers http.Header) *WSClient { +func NewWSClientFromConfig(clientType common.ServiceType, url *url.URL, authOpts *shttp.AuthenticationOpts, headers http.Header) *WSClient { host := config.GetString("host_id") queueSize := config.GetInt("http.ws.queue_size") return NewWSClient(host, clientType, url, authOpts, headers, queueSize) @@ -520,7 +521,7 @@ func newIncomingWSClient(conn *websocket.Conn, r *auth.AuthenticatedRequest) *ws } clientProtocol := getRequestParameter(&r.Request, "X-Client-Protocol") if clientProtocol != ProtobufProtocol { - clientProtocol = JsonProtocol + clientProtocol = JSONProtocol } host := config.GetString("host_id") diff --git a/http/wsmessage.go b/websocket/wsmessage.go similarity index 97% rename from http/wsmessage.go rename to websocket/wsmessage.go index f56590e384..b24a156110 100644 --- a/http/wsmessage.go +++ b/websocket/wsmessage.go @@ -20,7 +20,7 @@ * */ -package http +package websocket import ( "bytes" @@ -43,8 +43,11 @@ import ( const ( // WildcardNamespace is the namespace used as wildcard. It is used by listeners to filter callbacks. WildcardNamespace = "*" - ProtobufProtocol = "protobuf" - JsonProtocol = "json" + + // ProtobufProtocol is used for protobuf encoded messages + ProtobufProtocol = "protobuf" + // JSONProtocol is used for JSON encoded messages + JSONProtocol = "json" ) // DefaultRequestTimeout default timeout used for Request/Reply JSON message. @@ -74,7 +77,7 @@ type WSStructMessage struct { // Debug representation of the struct WSStructMessage func (g *WSStructMessage) Debug() string { - if g.Protocol == JsonProtocol { + if g.Protocol == JSONProtocol { return fmt.Sprintf("Namespace %s Type %s UUID %s Status %d Obj JSON (%d) : %q", g.Namespace, g.Type, g.UUID, g.Status, len(*g.JsonObj), string(*g.JsonObj)) } @@ -134,7 +137,7 @@ func (g WSStructMessage) Bytes(protocol string) []byte { } func (g *WSStructMessage) marshalObj() { - if g.Protocol == JsonProtocol { + if g.Protocol == JSONProtocol { b, err := json.Marshal(g.value) if err != nil { logging.GetLogger().Error("Json Marshal encode value failed", err) @@ -154,7 +157,7 @@ func (g *WSStructMessage) marshalObj() { } func (g *WSStructMessage) DecodeObj(obj interface{}) error { - if g.Protocol == JsonProtocol { + if g.Protocol == JSONProtocol { if err := common.JSONDecode(bytes.NewReader([]byte(*g.JsonObj)), obj); err != nil { return err } @@ -168,7 +171,7 @@ func (g *WSStructMessage) DecodeObj(obj interface{}) error { } func (g *WSStructMessage) UnmarshalObj(obj interface{}) error { - if g.Protocol == JsonProtocol { + if g.Protocol == JSONProtocol { if err := json.Unmarshal(*g.JsonObj, obj); err != nil { return err } @@ -380,12 +383,12 @@ func (s *WSStructSpeaker) OnMessage(c WSSpeaker, m WSMessage) { msg.ProtobufObj = mProtobuf.Obj } else { mJSON := WSStructMessageJSON{} - b := m.Bytes(JsonProtocol) + b := m.Bytes(JSONProtocol) if err := json.Unmarshal(b, &mJSON); err != nil { logging.GetLogger().Errorf("Error while decoding JSON WSStructMessage %s\n%s", err.Error(), hex.Dump(b)) return } - msg.Protocol = JsonProtocol + msg.Protocol = JSONProtocol msg.Namespace = mJSON.Namespace msg.Type = mJSON.Type msg.UUID = mJSON.UUID diff --git a/http/wsmessage_test.go b/websocket/wsmessage_test.go similarity index 96% rename from http/wsmessage_test.go rename to websocket/wsmessage_test.go index 313a8ca05d..a1265a746c 100644 --- a/http/wsmessage_test.go +++ b/websocket/wsmessage_test.go @@ -20,7 +20,7 @@ * */ -package http +package websocket import ( "errors" @@ -31,6 +31,7 @@ import ( "github.com/skydive-project/skydive/common" "github.com/skydive-project/skydive/config" + shttp "github.com/skydive-project/skydive/http" "github.com/skydive-project/skydive/logging" ) @@ -99,12 +100,12 @@ func (f *fakeWSMessageClientSubscriptionHandler) OnWSStructMessage(c WSSpeaker, func TestWSMessageSubscription(t *testing.T) { logging.InitLogging() - httpserver := NewServer("myhost", common.AnalyzerService, "localhost", 59999, "") + httpserver := shttp.NewServer("myhost", common.AnalyzerService, "localhost", 59999, "") go httpserver.ListenAndServe() defer httpserver.Stop() - wsserver := NewWSStructServer(NewWSServer(httpserver, "/wstest", NewNoAuthenticationBackend())) + wsserver := NewWSStructServer(NewWSServer(httpserver, "/wstest", shttp.NewNoAuthenticationBackend())) serverHandler := &fakeWSMessageServerSubscriptionHandler{t: t, server: wsserver, received: make(map[string]bool)} wsserver.AddEventHandler(serverHandler) diff --git a/http/wsserver.go b/websocket/wsserver.go similarity index 89% rename from http/wsserver.go rename to websocket/wsserver.go index 14046dc145..84e0c3c0d3 100644 --- a/http/wsserver.go +++ b/websocket/wsserver.go @@ -20,15 +20,17 @@ * */ -package http +package websocket import ( "net/http" + "strings" "github.com/abbot/go-http-auth" "github.com/gorilla/websocket" "github.com/skydive-project/skydive/common" + shttp "github.com/skydive-project/skydive/http" "github.com/skydive-project/skydive/logging" "github.com/skydive-project/skydive/rbac" ) @@ -40,7 +42,7 @@ type WSIncomerHandler func(*websocket.Conn, *auth.AuthenticatedRequest) WSSpeake type WSServer struct { common.RWMutex *wsIncomerPool - server *Server + server *shttp.Server incomerHandler WSIncomerHandler } @@ -53,6 +55,14 @@ func defaultIncomerHandler(conn *websocket.Conn, r *auth.AuthenticatedRequest) * return c } +func getRequestParameter(r *http.Request, name string) string { + param := r.Header.Get(name) + if param == "" { + param = r.URL.Query().Get(strings.ToLower(name)) + } + return param +} + func (s *WSServer) serveMessages(w http.ResponseWriter, r *auth.AuthenticatedRequest) { logging.GetLogger().Debugf("Enforcing websocket for %s, %s", s.name, r.Username) if rbac.Enforce(r.Username, "websocket", s.name) == false { @@ -99,7 +109,7 @@ func (s *WSServer) serveMessages(w http.ResponseWriter, r *auth.AuthenticatedReq } // NewWSServer returns a new WSServer. The given auth backend will validate the credentials -func NewWSServer(server *Server, endpoint string, authBackend AuthenticationBackend) *WSServer { +func NewWSServer(server *shttp.Server, endpoint string, authBackend shttp.AuthenticationBackend) *WSServer { s := &WSServer{ wsIncomerPool: newWSIncomerPool(endpoint), // server inherites from a WSSpeaker pool incomerHandler: func(c *websocket.Conn, a *auth.AuthenticatedRequest) WSSpeaker { diff --git a/http/wsserver_test.go b/websocket/wsserver_test.go similarity index 83% rename from http/wsserver_test.go rename to websocket/wsserver_test.go index c946031b30..1163d536d1 100644 --- a/http/wsserver_test.go +++ b/websocket/wsserver_test.go @@ -20,7 +20,7 @@ * */ -package http +package websocket import ( "errors" @@ -31,6 +31,7 @@ import ( "github.com/skydive-project/skydive/common" "github.com/skydive-project/skydive/config" + shttp "github.com/skydive-project/skydive/http" ) type fakeServerSubscriptionHandler struct { @@ -88,29 +89,29 @@ func (f *fakeClientSubscriptionHandler) OnMessage(c WSSpeaker, m WSMessage) { } func TestSubscription(t *testing.T) { - httpserver := NewServer("myhost", common.AnalyzerService, "localhost", 59999, "") + httpServer := shttp.NewServer("myhost", common.AnalyzerService, "localhost", 59999, "") - go httpserver.ListenAndServe() - defer httpserver.Stop() + go httpServer.ListenAndServe() + defer httpServer.Stop() - wsserver := NewWSServer(httpserver, "/wstest", NewNoAuthenticationBackend()) + wsServer := NewWSServer(httpServer, "/wstest", shttp.NewNoAuthenticationBackend()) - serverHandler := &fakeServerSubscriptionHandler{t: t, server: wsserver, connected: 0, received: 0} - wsserver.AddEventHandler(serverHandler) + serverHandler := &fakeServerSubscriptionHandler{t: t, server: wsServer, connected: 0, received: 0} + wsServer.AddEventHandler(serverHandler) - wsserver.Start() - defer wsserver.Stop() + wsServer.Start() + defer wsServer.Stop() - wsclient := NewWSClient("myhost", common.AgentService, config.GetURL("ws", "localhost", 59999, "/wstest"), nil, http.Header{}, 1000) - wspool := NewWSClientPool("TestSubscription") + wsClient := NewWSClient("myhost", common.AgentService, config.GetURL("ws", "localhost", 59999, "/wstest"), nil, http.Header{}, 1000) + wsPool := NewWSClientPool("TestSubscription") - wspool.AddClient(wsclient) + wsPool.AddClient(wsClient) clientHandler := &fakeClientSubscriptionHandler{t: t, received: 0} - wsclient.AddEventHandler(clientHandler) - wspool.AddEventHandler(clientHandler) - wsclient.Connect() - defer wsclient.Disconnect() + wsClient.AddEventHandler(clientHandler) + wsPool.AddEventHandler(clientHandler) + wsClient.Connect() + defer wsClient.Disconnect() err := common.Retry(func() error { clientHandler.Lock() diff --git a/http/wsstructmessage.proto b/websocket/wsstructmessage.proto similarity index 98% rename from http/wsstructmessage.proto rename to websocket/wsstructmessage.proto index 68656d913e..4c174ce50d 100644 --- a/http/wsstructmessage.proto +++ b/websocket/wsstructmessage.proto @@ -25,7 +25,7 @@ syntax = "proto3"; o proto3 fields are optional by default o required fields are not allowed in proto3 */ -package http; +package websocket; // WSStructMessage is a Protobuf based message on top of WSMessage. // It implements WSMessage interface and can be sent with via a WSSpeaker.