Skip to content

Commit

Permalink
libvirt: initial version of libvirt probe
Browse files Browse the repository at this point in the history
Libvirt probe dynamically polls the status of libvirt domains and
their network interfaces. For each new interface, it tries to enrich
the coresponding tun device with the name of the domain, the guest mac
and the complete address of the device on the guest pci bus.

The probe also adds one node per domain that tracks the status of the
libvirt domain and the association to interfaces.

The probe relies on libvirt-go the official go binding for libvirt
that requires the c libvirt library. It is compiled only under Linux.

closes #1410
Co-authored-by: Pierre Crégut <pierre.cregut@orange.com>
Co-authored-by: Sergey Glazyrin <s.glazyrin@partner.samsung.com>
  • Loading branch information
pierrecregut authored and lebauce committed Nov 9, 2018
1 parent 58b8c5d commit 4549995
Show file tree
Hide file tree
Showing 8 changed files with 372 additions and 0 deletions.
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ BOOTSTRAP_ARGS?=
BUILD_TAGS?=$(TAGS)
WITH_LXD?=true
WITH_OPENCONTRAIL?=true
WITH_LIBVIRT?=true

export PATH:=$(BUILD_TOOLS):$(PATH)

Expand Down Expand Up @@ -172,6 +173,10 @@ ifeq ($(WITH_LXD), true)
BUILD_TAGS+=lxd
endif

ifeq ($(WITH_LIBVIRT), true)
BUILD_TAGS+=libvirt
endif

STATIC_LIBS_ABS := $(addprefix $(STATIC_DIR)/,$(STATIC_LIBS))

.PHONY: all install
Expand Down
7 changes: 7 additions & 0 deletions agent/probes.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/skydive-project/skydive/probe"
"github.com/skydive-project/skydive/topology/graph"
"github.com/skydive-project/skydive/topology/probes/docker"
"github.com/skydive-project/skydive/topology/probes/libvirt"
"github.com/skydive-project/skydive/topology/probes/lldp"
"github.com/skydive-project/skydive/topology/probes/lxd"
"github.com/skydive-project/skydive/topology/probes/netlink"
Expand Down Expand Up @@ -107,6 +108,12 @@ func NewTopologyProbeBundleFromConfig(g *graph.Graph, hostNode *graph.Node) (*pr
probes[t] = opencontrail
case "socketinfo":
probes[t] = socketinfo.NewSocketInfoProbe(g, hostNode)
case "libvirt":
libvirt, err := libvirt.NewProbeFromConfig(g, hostNode)
if err != nil {
return nil, fmt.Errorf("Failed to initialize Libvirt probe: %s", err)
}
probes[t] = libvirt
default:
logging.GetLogger().Errorf("unknown probe type %s", t)
}
Expand Down
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func init() {
cfg.SetDefault("agent.flow.pcapsocket.max_port", 8132)
cfg.SetDefault("agent.listen", "127.0.0.1:8081")
cfg.SetDefault("agent.topology.probes", []string{"ovsdb"})
cfg.SetDefault("agent.topology.libvirt.url", "qemu:///system")
cfg.SetDefault("agent.topology.netlink.metrics_update", 30)
cfg.SetDefault("agent.topology.neutron.domain_name", "Default")
cfg.SetDefault("agent.topology.neutron.endpoint_type", "public")
Expand Down
1 change: 1 addition & 0 deletions contrib/ansible/roles/skydive_dev/tasks/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
- rpm-build
- rpmlint
- libvirt
- libvirt-devel
- libvirt-client
- vagrant-libvirt
- ansible
Expand Down
3 changes: 3 additions & 0 deletions etc/skydive.yml.default
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,9 @@ agent:
interfaces:
# - eth0

libvirt:
# url: qemu:///system

capture:
# Period in second to get capture stats from the probe. Note this
# stats_update: 1
Expand Down
302 changes: 302 additions & 0 deletions topology/probes/libvirt/libvirt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,302 @@
// +build libvirt,linux

/*
* Copyright (C) 2018 Orange.
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

package libvirt

import (
"context"
"encoding/xml"
"fmt"
"sync"

"github.com/skydive-project/skydive/config"

libvirtgo "github.com/libvirt/libvirt-go"
"github.com/skydive-project/skydive/logging"
"github.com/skydive-project/skydive/topology/graph"
)

// Probe is the libvirt probe
type Probe struct {
sync.Mutex
graph *graph.Graph // the graph
conn *libvirtgo.Connect // libvirt conection
interfaceMap map[string]*Interface // Found interfaces not yet connected.
cidLifecycle int // libvirt callback id of monitor to unregister
cidDevAdded int // second monitor on devices added to domains
uri string // uri of the libvirt connection
cancel context.CancelFunc // cancel function
root *graph.Node // root node for ownership
tunProcessor *graph.Processor // metadata indexer
}

// Address describes the XML coding of the pci addres of an interface in libvirt
type Address struct {
Type string `xml:"type,attr"`
Domain string `xml:"domain,attr"`
Bus string `xml:"bus,attr"`
Slot string `xml:"slot,attr"`
Function string `xml:"function,attr"`
}

// DomainStateMap stringifies the state of a domain
var DomainStateMap map[libvirtgo.DomainState]string = map[libvirtgo.DomainState]string{
libvirtgo.DOMAIN_NOSTATE: "UNDEFINED",
libvirtgo.DOMAIN_RUNNING: "UP",
libvirtgo.DOMAIN_BLOCKED: "BLOCKED",
libvirtgo.DOMAIN_PAUSED: "PAUSED",
libvirtgo.DOMAIN_SHUTDOWN: "DOWN",
libvirtgo.DOMAIN_CRASHED: "CRASHED",
libvirtgo.DOMAIN_PMSUSPENDED: "PMSUSPENDED",
libvirtgo.DOMAIN_SHUTOFF: "DOWN",
}

// Interface is XML coding of an interface in libvirt
type Interface struct {
Mac struct {
Address string `xml:"address,attr"`
} `xml:"mac"`
Target struct {
Device string `xml:"dev,attr"`
} `xml:"target"`
Address Address `xml:"address"`
Alias struct {
Name string `xml:"name,attr"`
} `xml:"alias"`
Host *graph.Node `xml:"-"`
}

// Domain is the subset of XML coding of a domain in libvirt
type Domain struct {
Interfaces []Interface `xml:"devices>interface"`
}

// getDomainInterfaces uses libvirt to get information on the interfaces of a
// domain.
func (probe *Probe) getDomainInterfaces(
domain *libvirtgo.Domain, // domain to query
domainNode *graph.Node, // Node representing the domain
constraint string, // to restrict the search to a single interface (by alias)
) (interfaces []*Interface) {
rawXML, err := domain.GetXMLDesc(0)
if err != nil {
logging.GetLogger().Errorf("Cannot get XMLDesc: %s", err)
return
}
d := Domain{}
if err = xml.Unmarshal([]byte(rawXML), &d); err != nil {
logging.GetLogger().Errorf("XML parsing error: %s", err)
return
}
for _, itf := range d.Interfaces {
if constraint == "" || constraint == itf.Alias.Name {
itf.Host = domainNode
interfaces = append(interfaces, &itf)
}
}
return
}

// registerInterfaces puts the information collected in the graph
// interfaces is an array of collected information.
func (probe *Probe) registerInterfaces(interfaces []*Interface) {
probe.graph.Lock()
defer probe.graph.Unlock()
for _, itf := range interfaces {
name := itf.Target.Device
if name == "" {
continue
}
logging.GetLogger().Debugf(
"Libvirt interface %s on %s", name, itf.Host)
probe.tunProcessor.DoAction(itf, name)
}
}

// ProcessNode adds the libvirt interface information to a node of the graph
func (itf *Interface) ProcessNode(g *graph.Graph, node *graph.Node) bool {
logging.GetLogger().Debugf("enrich %s", itf.Alias.Name)
tr := g.StartMetadataTransaction(node)
tr.AddMetadata("Libvirt.MAC", itf.Mac.Address)
tr.AddMetadata("Libvirt.Domain", itf.Host)
address := itf.Address
formatted := fmt.Sprintf(
"%s:%s.%s.%s.%s", address.Type, address.Domain, address.Bus,
address.Slot, address.Function)
tr.AddMetadata("Libvirt.Address", formatted)
tr.AddMetadata("Libvirt.Alias", itf.Alias.Name)
tr.AddMetadata("PeerIntfMAC", itf.Mac.Address)
tr.Commit()
if !g.AreLinked(node, itf.Host, graph.Metadata{"RelationType": "vlayer2"}) {
g.Link(node, itf.Host, graph.Metadata{"RelationType": "vlayer2"})
}
return false
}

// getDomain access the graph node representing a libvirt domain
func (probe *Probe) getDomain(d *libvirtgo.Domain) *graph.Node {
domainName, _ := d.GetName()
probe.graph.RLock()
defer probe.graph.RUnlock()
return probe.graph.LookupFirstNode(graph.Metadata{"Name": domainName, "Type": "libvirt"})
}

// createOrUpdateDomain creates a new graph node representing a libvirt domain
// if necessary and updates its state.
func (probe *Probe) createOrUpdateDomain(d *libvirtgo.Domain) *graph.Node {
g := probe.graph
g.Lock()
defer g.Unlock()
domainName, _ := d.GetName()
metadata := graph.Metadata{
"Name": domainName,
"Type": "libvirt",
}
domainNode := g.LookupFirstNode(metadata)
if domainNode == nil {
domainNode = g.NewNode(graph.GenID(), metadata)
g.Link(probe.root, domainNode, graph.Metadata{"RelationType": "ownership"})
}
state, _, err := d.GetState()
if err != nil {
logging.GetLogger().Errorf("Cannot update domain state for %s", domainName)
} else {
tr := g.StartMetadataTransaction(domainNode)
defer tr.Commit()
tr.AddMetadata("State", DomainStateMap[state])
}
return domainNode
}

// deleteDomain deletes the graph node representing a libvirt domain
func (probe *Probe) deleteDomain(d *libvirtgo.Domain) {
domainNode := probe.getDomain(d)
if domainNode != nil {
probe.graph.Lock()
defer probe.graph.Unlock()
probe.graph.DelNode(domainNode)
}
}

// Start get all domains attached to a libvirt connection
func (probe *Probe) Start() {
// The event loop must be registered WITH its poll loop active BEFORE the
// connection is opened. Otherwise it just does not work.
if err := libvirtgo.EventRegisterDefaultImpl(); err != nil {
logging.GetLogger().Errorf("libvirt event handler: %s", err)
}
ctx, cancel := context.WithCancel(context.Background())
probe.cancel = cancel
go func() {
for ctx.Err() == nil {
if err := libvirtgo.EventRunDefaultImpl(); err != nil {
logging.GetLogger().Errorf("libvirt poll loop problem: %s", err)
}
}
}()
conn, err := libvirtgo.NewConnectReadOnly(probe.uri)
if err != nil {
logging.GetLogger().Errorf("Failed to create libvirt connect")
return
}
probe.conn = conn
callback := func(
c *libvirtgo.Connect, d *libvirtgo.Domain,
event *libvirtgo.DomainEventLifecycle,
) {
switch event.Event {
case libvirtgo.DOMAIN_EVENT_UNDEFINED:
probe.deleteDomain(d)
case libvirtgo.DOMAIN_EVENT_STARTED:
domainNode := probe.createOrUpdateDomain(d)
interfaces := probe.getDomainInterfaces(d, domainNode, "")
probe.registerInterfaces(interfaces)
case libvirtgo.DOMAIN_EVENT_DEFINED, libvirtgo.DOMAIN_EVENT_SUSPENDED,
libvirtgo.DOMAIN_EVENT_RESUMED, libvirtgo.DOMAIN_EVENT_STOPPED,
libvirtgo.DOMAIN_EVENT_SHUTDOWN, libvirtgo.DOMAIN_EVENT_PMSUSPENDED,
libvirtgo.DOMAIN_EVENT_CRASHED:
probe.createOrUpdateDomain(d)
}
}
probe.cidLifecycle, err = conn.DomainEventLifecycleRegister(nil, callback)
if err != nil {
logging.GetLogger().Errorf(
"Could not register the lifecycle event handler %s", err)
}
callbackDeviceAdded := func(
c *libvirtgo.Connect, d *libvirtgo.Domain,
event *libvirtgo.DomainEventDeviceAdded,
) {
domainNode := probe.getDomain(d)
interfaces := probe.getDomainInterfaces(d, domainNode, event.DevAlias)
probe.registerInterfaces(interfaces) // 0 or 1 device changed.
}
probe.cidDevAdded, err = conn.DomainEventDeviceAddedRegister(nil, callbackDeviceAdded)
if err != nil {
logging.GetLogger().Errorf(
"Could not register the device added event handler %s", err)
}
domains, err := probe.conn.ListAllDomains(0)
if err != nil {
logging.GetLogger().Error(err)
return
}
for _, domain := range domains {
domainNode := probe.createOrUpdateDomain(&domain)
interfaces := probe.getDomainInterfaces(&domain, domainNode, "")
probe.registerInterfaces(interfaces)
}
}

// Stop stops the probe
func (probe *Probe) Stop() {
probe.cancel()
probe.tunProcessor.Stop()
if probe.cidLifecycle != -1 {
probe.conn.DomainEventDeregister(probe.cidLifecycle)
probe.conn.DomainEventDeregister(probe.cidDevAdded)
}
probe.conn.Close()
}

// NewProbe creates a libvirt topology probe
func NewProbe(g *graph.Graph, uri string, root *graph.Node) *Probe {
tunProcessor := graph.NewProcessor(g, g, graph.Metadata{"Type": "tun"}, "Name")
probe := &Probe{
graph: g,
interfaceMap: make(map[string]*Interface),
cidLifecycle: -1,
uri: uri,
root: root,
tunProcessor: tunProcessor,
}
tunProcessor.Start()
return probe
}

// NewProbeFromConfig initializes the probe
func NewProbeFromConfig(g *graph.Graph, root *graph.Node) (*Probe, error) {
uri := config.GetString("agent.topology.libvirt.url")
return NewProbe(g, uri, root), nil
}
Loading

0 comments on commit 4549995

Please sign in to comment.