Skip to content

Commit

Permalink
ovn: make use of retry probe wrapper
Browse files Browse the repository at this point in the history
  • Loading branch information
lebauce committed Jul 30, 2019
1 parent ec9267f commit 0390317
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 119 deletions.
2 changes: 1 addition & 1 deletion contrib/vagrant/Vagrantfile
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ $skydive_extra_config = eval(ANSIBLE_EXTRA_CONFIG)
$skydive_extra_config["http.ws.pong_timeout"] = 10
$skydive_extra_config["agent.topology.probes"] = ["ovsdb", "docker"]
$skydive_extra_config["agent.topology.netlink.metrics_update"] = 5
$skydive_extra_config["analyzer.topology.ovn.address"] = "tcp://127.0.0.1:6641"
$skydive_extra_config["analyzer.topology.ovn.address"] = "tcp:127.0.0.1:6641"

if DEVMODE then
# default config from tests/tests.go:testConfig
Expand Down
6 changes: 3 additions & 3 deletions etc/skydive.yml.default
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,9 @@ analyzer:

ovn:
# OVN northbound address. Format can be either:
# * tcp://addr:port
# * unix:///var/run/openvswitch/ovnnb_db.sock
# address: unix:///var/run/openvswitch/ovnnb_db.sock
# * tcp:addr:port
# * unix:/var/run/openvswitch/ovnnb_db.sock
# address: unix:/var/run/openvswitch/ovnnb_db.sock

replication:
# debug: false
Expand Down
223 changes: 114 additions & 109 deletions topology/probes/ovn/ovn.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@
package ovn

import (
"context"
"encoding/json"
"fmt"
"sync"

"github.com/skydive-project/skydive/common"
"github.com/skydive-project/skydive/probe"
"github.com/skydive-project/skydive/topology/probes"

goovn "github.com/ebay/go-ovn"
"github.com/skydive-project/skydive/graffiti/graph"
Expand All @@ -38,15 +41,13 @@ type ovnEvent func()
type Probe struct {
graph.ListenerHandler
graph *graph.Graph
wg sync.WaitGroup
socketfile string
protocol string
server string
port int
ovndbapi goovn.OVNDBApi
address string
ovndbapi goovn.Client
switchPorts map[string]*goovn.LogicalSwitch
eventChan chan ovnEvent
bundle *probe.Bundle
aclIndexer *graph.Indexer
ifaces *graph.MetadataIndexer
lsIndexer *graph.Indexer
lspIndexer *graph.Indexer
lrIndexer *graph.Indexer
Expand Down Expand Up @@ -127,7 +128,7 @@ type switchPortLinker struct {
func (l *switchPortLinker) GetABLinks(lsNode *graph.Node) (edges []*graph.Edge) {
probe := l.probe
name, _ := lsNode.GetFieldString("Name")
ports, _ := l.probe.ovndbapi.GetLogicalSwitchPortsBySwitch(name)
ports, _ := l.probe.ovndbapi.LSPList(name)
for _, lp := range ports {
if lpNode, _ := probe.lspIndexer.GetNode(lp.UUID); lpNode != nil {
link, err := topology.NewLink(probe.graph, lsNode, lpNode, topology.OwnershipLink, nil)
Expand All @@ -145,9 +146,9 @@ func (l *switchPortLinker) GetABLinks(lsNode *graph.Node) (edges []*graph.Edge)
func (l *switchPortLinker) GetBALinks(lpNode *graph.Node) (edges []*graph.Edge) {
probe := l.probe
uuid, _ := lpNode.GetFieldString("UUID")
switches, _ := l.probe.ovndbapi.GetLogicalSwitches()
switches, _ := l.probe.ovndbapi.LSList()
for _, ls := range switches {
ports, _ := l.probe.ovndbapi.GetLogicalSwitchPortsBySwitch(ls.Name)
ports, _ := l.probe.ovndbapi.LSPList(ls.Name)
for _, lp := range ports {
if lp.UUID == uuid {
if lsNode, _ := probe.lsIndexer.GetNode(ls.UUID); lsNode != nil {
Expand All @@ -172,7 +173,7 @@ type routerPortLinker struct {
func (l *routerPortLinker) GetABLinks(lrNode *graph.Node) (edges []*graph.Edge) {
probe := l.probe
name, _ := lrNode.GetFieldString("Name")
ports, _ := l.probe.ovndbapi.GetLogicalRouterPortsByRouter(name)
ports, _ := l.probe.ovndbapi.LRPList(name)
for _, lp := range ports {
if lrpNode, _ := probe.lrpIndexer.GetNode(lp.UUID); lrpNode != nil {
link, err := topology.NewLink(probe.graph, lrNode, lrpNode, topology.OwnershipLink, nil)
Expand All @@ -190,9 +191,9 @@ func (l *routerPortLinker) GetABLinks(lrNode *graph.Node) (edges []*graph.Edge)
func (l *routerPortLinker) GetBALinks(lrpNode *graph.Node) (edges []*graph.Edge) {
probe := l.probe
uuid, _ := lrpNode.GetFieldString("UUID")
routers, _ := l.probe.ovndbapi.GetLogicalRouters()
routers, _ := l.probe.ovndbapi.LRList()
for _, lr := range routers {
ports, _ := l.probe.ovndbapi.GetLogicalRouterPortsByRouter(lr.Name)
ports, _ := l.probe.ovndbapi.LRPList(lr.Name)
for _, lp := range ports {
if lp.UUID == uuid {
if lrNode, _ := probe.lrIndexer.GetNode(lr.UUID); lrNode != nil {
Expand All @@ -216,7 +217,7 @@ type aclLinker struct {
// GetABLinks returns all the links from a specified port group to its ACLs
func (l *aclLinker) GetABLinks(lsNode *graph.Node) (edges []*graph.Edge) {
name, _ := lsNode.GetFieldString("Name")
acls, _ := l.probe.ovndbapi.GetACLsBySwitch(name)
acls, _ := l.probe.ovndbapi.ACLList(name)
for _, acl := range acls {
if aclNode, _ := l.probe.aclIndexer.GetNode(acl.UUID); aclNode != nil {
if link, _ := topology.NewLink(l.probe.graph, lsNode, aclNode, topology.OwnershipLink, nil); link != nil {
Expand All @@ -230,9 +231,9 @@ func (l *aclLinker) GetABLinks(lsNode *graph.Node) (edges []*graph.Edge) {
// GetBALinks returns all the links from a port group to the specified ACL
func (l *aclLinker) GetBALinks(aclNode *graph.Node) (edges []*graph.Edge) {
uuid, _ := aclNode.GetFieldString("Name")
switches, _ := l.probe.ovndbapi.GetLogicalSwitches()
switches, _ := l.probe.ovndbapi.LSList()
for _, ls := range switches {
acls, _ := l.probe.ovndbapi.GetACLsBySwitch(ls.Name)
acls, _ := l.probe.ovndbapi.ACLList(ls.Name)
for _, acl := range acls {
if acl.UUID == uuid {
if lsNode, _ := l.probe.lsIndexer.GetNode(ls.UUID); lsNode != nil {
Expand Down Expand Up @@ -406,6 +407,14 @@ func (p *Probe) OnLogicalRouterPortDelete(lp *goovn.LogicalRouterPort) {
p.eventChan <- func() { p.unregisterNode(p.lrpIndexer, lp.UUID) }
}

// OnLogicalRouterStaticRouteCreate is called when a static route is added to a router
func (p *Probe) OnLogicalRouterStaticRouteCreate(lp *goovn.LogicalRouterStaticRoute) {
}

// OnLogicalRouterStaticRouteDelete is called when a static route is removed from a router
func (p *Probe) OnLogicalRouterStaticRouteDelete(lp *goovn.LogicalRouterStaticRoute) {
}

// OnQoSCreate is called when QoS is created
func (p *Probe) OnQoSCreate(*goovn.QoS) {
}
Expand Down Expand Up @@ -447,109 +456,90 @@ func (p *Probe) OnError(err error) {
logging.GetLogger().Error(err)
}

// Start the probe
func (p *Probe) Start() {
p.lsIndexer.Start()
p.lspIndexer.Start()
p.lrIndexer.Start()
p.lrpIndexer.Start()
p.aclIndexer.Start()
p.spLinker.Start()
p.rpLinker.Start()
p.aclLinker.Start()
p.srLinker.Start()
p.ifaceLinker.Start()
// OnDisconnected is called when the connection to OVSDB is lost
func (p *Probe) OnDisconnected() {
logging.GetLogger().Warning("disconnected from the OVSDB API")
close(p.eventChan)
}

// Do implements the probe main loop
func (p *Probe) Do(ctx context.Context, wg *sync.WaitGroup) error {
var err error
logging.GetLogger().Debugf("Trying to get an OVN DB api")
p.ovndbapi, err = goovn.GetInstance(p.socketfile, p.protocol, p.server, p.port, p)
cfg := &goovn.Config{
Addr: p.address,
SignalCB: p,
DisconnectCB: p.OnDisconnected,
}
p.ovndbapi, err = goovn.NewClient(cfg)
if err != nil {
logging.GetLogger().Error(err)
return
return err
}
logging.GetLogger().Debugf("Successfully got an OVN DB api")

p.graph.RLock()
p.ifaces.Sync()
p.graph.RUnlock()

p.bundle.Start()

// Initial synchronization
switches, _ := p.ovndbapi.GetLogicalSwitches()
switches, _ := p.ovndbapi.LSList()
for _, ls := range switches {
p.OnLogicalSwitchCreate(ls)

ports, _ := p.ovndbapi.GetLogicalSwitchPortsBySwitch(ls.Name)
ports, _ := p.ovndbapi.LSPList(ls.Name)
for _, lp := range ports {
p.OnLogicalPortCreate(lp)
}

acls, _ := p.ovndbapi.GetACLsBySwitch(ls.Name)
acls, _ := p.ovndbapi.ACLList(ls.Name)
for _, acl := range acls {
p.OnACLCreate(acl)
}
}

routers, _ := p.ovndbapi.GetLogicalRouters()
routers, _ := p.ovndbapi.LRList()

for _, lr := range routers {
p.OnLogicalRouterCreate(lr)

ports, _ := p.ovndbapi.GetLogicalRouterPortsByRouter(lr.Name)
ports, _ := p.ovndbapi.LRPList(lr.Name)
for _, lp := range ports {
p.OnLogicalRouterPortCreate(lp)
}
}

p.wg.Add(1)
wg.Add(1)

go func() {
defer p.wg.Done()

for eventCallback := range p.eventChan {
eventCallback()
defer func() {
wg.Done()
p.bundle.Stop()
}()

for {
select {
case eventCallback, ok := <-p.eventChan:
if !ok {
return
}
eventCallback()
case <-ctx.Done():
p.ovndbapi.Close()
return
}
}
}()
}

// Stop the probe
func (p *Probe) Stop() {
close(p.eventChan)
p.wg.Wait()
p.lsIndexer.Stop()
p.lspIndexer.Stop()
p.lrIndexer.Stop()
p.lrpIndexer.Stop()
p.aclIndexer.Stop()
p.spLinker.Stop()
p.aclLinker.Stop()
p.rpLinker.Stop()
p.ifaceLinker.Stop()
return nil
}

// NewProbe creates a new graph OVS database probe
func NewProbe(g *graph.Graph, address string) (*Probe, error) {
port, socketfile, server := 0, "", ""

protocol, target, err := common.ParseAddr(address)
if err != nil {
return nil, err
}

switch protocol {
case "unix":
protocol, socketfile = goovn.UNIX, target
case "tcp":

sa, err := common.ServiceAddressFromString(target)
if err != nil {
return nil, err
}
protocol, server, port = goovn.TCP, sa.Addr, sa.Port
default:
return nil, fmt.Errorf("unsupported protocol %s", protocol)
}

probe := &Probe{
func NewProbe(g *graph.Graph, address string) (probe.Handler, error) {
p := &Probe{
graph: g,
protocol: protocol,
socketfile: socketfile,
server: server,
port: port,
address: address,
eventChan: make(chan ovnEvent, 50),
aclIndexer: graph.NewIndexer(g, nil, uuidHasher, false),
lsIndexer: graph.NewIndexer(g, nil, uuidHasher, false),
Expand All @@ -558,51 +548,66 @@ func NewProbe(g *graph.Graph, address string) (*Probe, error) {
lrpIndexer: graph.NewIndexer(g, nil, uuidHasher, false),
}

p.bundle = &probe.Bundle{
Handlers: map[string]probe.Handler{
"aclIndexer": p.aclIndexer,
"lsIndexer": p.lsIndexer,
"lspIndexer": p.lspIndexer,
"lrIndexer": p.lrIndexer,
"lrpIndexer": p.lrpIndexer,
},
}

// Link logical switches to their ports
probe.spLinker = graph.NewResourceLinker(g,
[]graph.ListenerHandler{probe.lsIndexer},
[]graph.ListenerHandler{probe.lspIndexer},
&switchPortLinker{probe: probe}, nil)
p.spLinker = graph.NewResourceLinker(g,
[]graph.ListenerHandler{p.lsIndexer},
[]graph.ListenerHandler{p.lspIndexer},
&switchPortLinker{probe: p}, nil)
p.bundle.AddHandler("spLinker", p.spLinker)

// Link logical routers to their ports
probe.rpLinker = graph.NewResourceLinker(g,
[]graph.ListenerHandler{probe.lrIndexer},
[]graph.ListenerHandler{probe.lrpIndexer},
&routerPortLinker{probe: probe}, nil)
p.rpLinker = graph.NewResourceLinker(g,
[]graph.ListenerHandler{p.lrIndexer},
[]graph.ListenerHandler{p.lrpIndexer},
&routerPortLinker{probe: p}, nil)
p.bundle.AddHandler("rpLinker", p.rpLinker)

// Link ports switches to their ACLs
probe.aclLinker = graph.NewResourceLinker(g,
[]graph.ListenerHandler{probe.lspIndexer},
[]graph.ListenerHandler{probe.aclIndexer},
&aclLinker{probe: probe}, nil)
p.aclLinker = graph.NewResourceLinker(g,
[]graph.ListenerHandler{p.lspIndexer},
[]graph.ListenerHandler{p.aclIndexer},
&aclLinker{probe: p}, nil)
p.bundle.AddHandler("aclLinker", p.aclLinker)

// We create a metadata indexer linker to link the logical switch ports that have
// an Options.router-port attribute to the logical router port with the specified name
lspIndexer := graph.NewMetadataIndexer(g, probe.lspIndexer, nil, "OVN.Options.router-port")
lspIndexer.Start()
routerPortIndexer := graph.NewMetadataIndexer(g, p.lspIndexer, nil, "OVN.Options.router-port")
p.bundle.AddHandler("routerPortIndexer", routerPortIndexer)

lrpIndexer := graph.NewMetadataIndexer(g, probe.lrpIndexer, graph.Metadata{"Type": "logical_port"}, "Name")
lrpIndexer.Start()
lpIndexer := graph.NewMetadataIndexer(g, p.lrpIndexer, graph.Metadata{"Type": "logical_port"}, "Name")
p.bundle.AddHandler("lpIndexer", lpIndexer)

probe.srLinker = graph.NewMetadataIndexerLinker(g, lspIndexer, lrpIndexer, graph.Metadata{"RelationType": "layer2"})
p.srLinker = graph.NewMetadataIndexerLinker(g, routerPortIndexer, lpIndexer, graph.Metadata{"RelationType": "layer2"})
p.bundle.AddHandler("srLinker", p.srLinker)

// We create an other metadata indexer linker to link the OVS interfaces to their logical OVN port
// To do so, we use the ExtID.iface-id attribute to link an interface to the logical
// switch port with the specified name
ifaceIndexer := graph.NewMetadataIndexer(g, g, nil, "ExtID.iface-id")
ifaceIndexer.Start()
p.ifaces = graph.NewMetadataIndexer(g, g, nil, "ExtID.iface-id")
p.bundle.AddHandler("ifaces", p.ifaces)

lspIndexer = graph.NewMetadataIndexer(g, probe.lspIndexer, graph.Metadata{"Type": "logical_port"}, "Name")
lspIndexer.Start()
lpIndexer2 := graph.NewMetadataIndexer(g, p.lspIndexer, graph.Metadata{"Type": "logical_port"}, "Name")
p.bundle.AddHandler("lpIndexer2", lpIndexer2)

probe.ifaceLinker = graph.NewMetadataIndexerLinker(g, ifaceIndexer, lspIndexer, graph.Metadata{"RelationType": "mapping"})
p.ifaceLinker = graph.NewMetadataIndexerLinker(g, p.ifaces, lpIndexer2, graph.Metadata{"RelationType": "mapping"})
p.bundle.AddHandler("ifaceLinker", p.ifaceLinker)

// Handle linkers errors
probe.aclLinker.AddEventListener(probe)
probe.rpLinker.AddEventListener(probe)
probe.spLinker.AddEventListener(probe)
probe.srLinker.AddEventListener(probe)
probe.ifaceLinker.AddEventListener(probe)
p.aclLinker.AddEventListener(p)
p.rpLinker.AddEventListener(p)
p.spLinker.AddEventListener(p)
p.srLinker.AddEventListener(p)
p.ifaceLinker.AddEventListener(p)

return probe, nil
return probes.NewProbeWrapper(p), nil
}
Loading

0 comments on commit 0390317

Please sign in to comment.