Skip to content

Commit

Permalink
gremlin: move gremlin extensions to a common place
Browse files Browse the repository at this point in the history
The goal is to ease the metrics step extraction
as "standard" step to become an extension. This
will fix cyclic dependency issues.

This patch also deprecates the GraphPath step.
  • Loading branch information
safchain committed Jan 8, 2018
1 parent f1029b5 commit 6d25592
Show file tree
Hide file tree
Showing 22 changed files with 416 additions and 580 deletions.
3 changes: 2 additions & 1 deletion agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/skydive-project/skydive/flow/enhancers"
ondemand "github.com/skydive-project/skydive/flow/ondemand/server"
fprobes "github.com/skydive-project/skydive/flow/probes"
ge "github.com/skydive-project/skydive/gremlin/traversal"
shttp "github.com/skydive-project/skydive/http"
"github.com/skydive-project/skydive/packet_injector"
"github.com/skydive-project/skydive/probe"
Expand Down Expand Up @@ -178,7 +179,7 @@ func NewAgent() (*Agent, error) {
wsServer := shttp.NewWSJSONServer(shttp.NewWSServer(hserver, "/ws/subscriber"))

tr := traversal.NewGremlinTraversalParser()
tr.AddTraversalExtension(topology.NewTopologyTraversalExtension())
tr.AddTraversalExtension(ge.NewMetricsTraversalExtension())

rootNode, err := createRootNode(g)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions analyzer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
"github.com/skydive-project/skydive/flow"
ondemand "github.com/skydive-project/skydive/flow/ondemand/client"
"github.com/skydive-project/skydive/flow/storage"
ftraversal "github.com/skydive-project/skydive/flow/traversal"
ge "github.com/skydive-project/skydive/gremlin/traversal"
shttp "github.com/skydive-project/skydive/http"
"github.com/skydive-project/skydive/logging"
"github.com/skydive-project/skydive/packet_injector"
Expand Down Expand Up @@ -278,8 +278,8 @@ func NewServerFromConfig() (*Server, error) {
}

tr := traversal.NewGremlinTraversalParser()
tr.AddTraversalExtension(topology.NewTopologyTraversalExtension())
tr.AddTraversalExtension(ftraversal.NewFlowTraversalExtension(tableClient, storage))
tr.AddTraversalExtension(ge.NewMetricsTraversalExtension())
tr.AddTraversalExtension(ge.NewFlowTraversalExtension(tableClient, storage))

alertServer := alert.NewAlertServer(alertAPIHandler, subscriberWSServer, g, tr, etcdClient)

Expand Down
4 changes: 2 additions & 2 deletions api/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import (
"github.com/nu7hatch/gouuid"

"github.com/skydive-project/skydive/common"
ge "github.com/skydive-project/skydive/gremlin/traversal"
"github.com/skydive-project/skydive/logging"
"github.com/skydive-project/skydive/topology"
"github.com/skydive-project/skydive/topology/graph"
)

Expand Down Expand Up @@ -96,7 +96,7 @@ func (c *CaptureAPIHandler) Decorate(resource Resource) {
c.Graph.RLock()
defer c.Graph.RUnlock()

res, err := topology.ExecuteGremlinQuery(c.Graph, capture.GremlinQuery)
res, err := ge.TopologyGremlinQuery(c.Graph, capture.GremlinQuery)
if err != nil {
logging.GetLogger().Errorf("Gremlin error: %s", err.Error())
return
Expand Down
4 changes: 2 additions & 2 deletions api/packet_injector.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ import (

"github.com/abbot/go-http-auth"

ge "github.com/skydive-project/skydive/gremlin/traversal"
shttp "github.com/skydive-project/skydive/http"
"github.com/skydive-project/skydive/logging"
"github.com/skydive-project/skydive/packet_injector"
"github.com/skydive-project/skydive/topology"
"github.com/skydive-project/skydive/topology/graph"
"github.com/skydive-project/skydive/validator"
)
Expand Down Expand Up @@ -204,7 +204,7 @@ func (pi *PacketInjectorAPI) injectPacket(w http.ResponseWriter, r *auth.Authent
}

func (pi *PacketInjectorAPI) getNode(gremlinQuery string) *graph.Node {
res, err := topology.ExecuteGremlinQuery(pi.Graph, gremlinQuery)
res, err := ge.TopologyGremlinQuery(pi.Graph, gremlinQuery)
if err != nil {
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions api/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (

"github.com/abbot/go-http-auth"
"github.com/skydive-project/skydive/flow"
ftraversal "github.com/skydive-project/skydive/flow/traversal"
ge "github.com/skydive-project/skydive/gremlin/traversal"
shttp "github.com/skydive-project/skydive/http"
"github.com/skydive-project/skydive/logging"
"github.com/skydive-project/skydive/topology/graph"
Expand Down Expand Up @@ -162,7 +162,7 @@ func (t *TopologyAPI) topologySearch(w http.ResponseWriter, r *auth.Authenticate
writeError(w, http.StatusNotAcceptable, errors.New("Only graph can be outputted as dot"))
}
} else if strings.Contains(r.Header.Get("Accept"), "vnd.tcpdump.pcap") {
if rawPacketsTraversal, ok := res.(*ftraversal.RawPacketsTraversalStep); ok {
if rawPacketsTraversal, ok := res.(*ge.RawPacketsTraversalStep); ok {
values := rawPacketsTraversal.Values()
if len(values) == 0 {
writeError(w, http.StatusNotFound, errors.New("No raw packet found, please check your Gremlin request and the time context"))
Expand Down
3 changes: 2 additions & 1 deletion cmd/client/gremlin.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/skydive-project/skydive/common"
"github.com/skydive-project/skydive/flow"
shttp "github.com/skydive-project/skydive/http"
"github.com/skydive-project/skydive/topology"
"github.com/skydive-project/skydive/topology/graph"
)

Expand Down Expand Up @@ -184,7 +185,7 @@ func flatMetrictoTimedMetric(flat map[string]interface{}) (*common.TimedMetric,
}
tm.Metric = &metric
} else {
metric := graph.InterfaceMetric{}
metric := topology.InterfaceMetric{}
if err := mapstructure.WeakDecode(flat, &metric); err != nil {
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions flow/ondemand/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ import (
"github.com/skydive-project/skydive/common"
"github.com/skydive-project/skydive/etcd"
"github.com/skydive-project/skydive/flow/ondemand"
ge "github.com/skydive-project/skydive/gremlin/traversal"
shttp "github.com/skydive-project/skydive/http"
"github.com/skydive-project/skydive/logging"
"github.com/skydive-project/skydive/topology"
"github.com/skydive-project/skydive/topology/graph"
)

Expand Down Expand Up @@ -184,7 +184,7 @@ func (o *OnDemandProbeClient) unregisterProbe(node *graph.Node, capture *api.Cap
}

func (o *OnDemandProbeClient) applyGremlinExpr(query string) []interface{} {
res, err := topology.ExecuteGremlinQuery(o.graph, query)
res, err := ge.TopologyGremlinQuery(o.graph, query)
if err != nil {
logging.GetLogger().Errorf("Gremlin error: %s", err.Error())
return nil
Expand Down Expand Up @@ -285,7 +285,7 @@ func (o *OnDemandProbeClient) unregisterCapture(capture *api.Capture) {
delete(o.captures, capture.UUID)
o.Unlock()

res, err := topology.ExecuteGremlinQuery(o.graph, capture.GremlinQuery)
res, err := ge.TopologyGremlinQuery(o.graph, capture.GremlinQuery)
if err != nil {
logging.GetLogger().Errorf("Gremlin error: %s", err.Error())
return
Expand Down
26 changes: 8 additions & 18 deletions flow/traversal/traversal.go → gremlin/traversal/flows.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,6 @@ import (
"github.com/skydive-project/skydive/topology/graph/traversal"
)

const (
traversalFlowToken traversal.Token = 1001
traversalHopsToken traversal.Token = 1002
traversalNodesToken traversal.Token = 1003
traversalCaptureNodeToken traversal.Token = 1004
traversalAggregatesToken traversal.Token = 1005
traversalRawPacketsToken traversal.Token = 1006
traversalBpfToken traversal.Token = 1007
)

const (
defaultSortBy = "Last"
)
Expand Down Expand Up @@ -507,9 +497,9 @@ func (f *FlowTraversalStep) PropertyKeys(keys ...interface{}) *traversal.GraphTr
}

// Metrics returns flow metric counters
func (f *FlowTraversalStep) Metrics() *traversal.MetricsTraversalStep {
func (f *FlowTraversalStep) FlowMetrics() *MetricsTraversalStep {
if f.error != nil {
return traversal.NewMetricsTraversalStep(nil, nil, f.error)
return NewMetricsTraversalStep(nil, nil, f.error)
}

var flowMetrics map[string][]*common.TimedMetric
Expand All @@ -523,7 +513,7 @@ func (f *FlowTraversalStep) Metrics() *traversal.MetricsTraversalStep {
flowFilter := flow.NewFilterForFlowSet(f.flowset)
f.flowSearchQuery.Filter = filters.NewAndFilter(f.flowSearchQuery.Filter, flowFilter)
} else if f.flowSearchQuery.Filter == nil {
return traversal.NewMetricsTraversalStep(nil, nil, errors.New("Unable to filter flows"))
return NewMetricsTraversalStep(nil, nil, errors.New("Unable to filter flows"))
}

fr := filters.Range{To: context.TimeSlice.Last}
Expand All @@ -538,7 +528,7 @@ func (f *FlowTraversalStep) Metrics() *traversal.MetricsTraversalStep {

var err error
if flowMetrics, err = f.Storage.SearchMetrics(f.flowSearchQuery, metricFilter); err != nil {
return traversal.NewMetricsTraversalStep(nil, nil, f.error)
return NewMetricsTraversalStep(nil, nil, f.error)
}
} else {
flowMetrics = make(map[string][]*common.TimedMetric, len(f.flowset.Flows))
Expand All @@ -562,7 +552,7 @@ func (f *FlowTraversalStep) Metrics() *traversal.MetricsTraversalStep {
}
}

return traversal.NewMetricsTraversalStep(f.GraphTraversal, flowMetrics, nil)
return NewMetricsTraversalStep(f.GraphTraversal, flowMetrics, nil)
}

// Values returns list of raw packets
Expand Down Expand Up @@ -953,7 +943,7 @@ func (s *FlowGremlinTraversalStep) Reduce(next traversal.GremlinTraversalStep) t
}

switch next.(type) {
case *traversal.GremlinTraversalStepMetrics:
case *MetricsGremlinTraversalStep:
s.metricsNextStep = true
case *RawPacketsGremlinTraversalStep:
s.rawpacketsNextStep = true
Expand Down Expand Up @@ -1048,8 +1038,8 @@ func (s *CaptureNodeGremlinTraversalStep) Context() *traversal.GremlinTraversalC
// Exec Aggregates step
func (a *AggregatesGremlinTraversalStep) Exec(last traversal.GraphTraversalStep) (traversal.GraphTraversalStep, error) {
switch last.(type) {
case *traversal.MetricsTraversalStep:
mts := last.(*traversal.MetricsTraversalStep)
case *MetricsTraversalStep:
mts := last.(*MetricsTraversalStep)
return mts.Aggregates(), nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (

"github.com/skydive-project/skydive/common"
"github.com/skydive-project/skydive/flow"
"github.com/skydive-project/skydive/topology/graph/traversal"
)

func TestFlowMetricsAggregates(t *testing.T) {
Expand Down Expand Up @@ -100,7 +99,7 @@ func TestFlowMetricsAggregates(t *testing.T) {
},
},
}
step := traversal.NewMetricsTraversalStep(nil, metrics, nil)
step := NewMetricsTraversalStep(nil, metrics, nil)

metrics = map[string][]*common.TimedMetric{
"Aggregated": {
Expand Down Expand Up @@ -142,7 +141,7 @@ func TestFlowMetricsAggregates(t *testing.T) {
},
},
}
expected := traversal.NewMetricsTraversalStep(nil, metrics, nil)
expected := NewMetricsTraversalStep(nil, metrics, nil)

got := step.Aggregates()

Expand Down
Loading

0 comments on commit 6d25592

Please sign in to comment.