Skip to content

Commit

Permalink
added statistics for dropped flows from flow table
Browse files Browse the repository at this point in the history
latest changes

assume stats map is found since it was loaded

fixed
  • Loading branch information
root committed Jun 11, 2019
1 parent 4679daa commit f2f082a
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 14 deletions.
2 changes: 2 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func init() {
cfg.SetDefault("agent.flow.pcapsocket.bind_address", "127.0.0.1")
cfg.SetDefault("agent.flow.pcapsocket.min_port", 8100)
cfg.SetDefault("agent.flow.pcapsocket.max_port", 8132)
cfg.SetDefault("agent.flow.ebpf.kernel_scan_interval", 10)
cfg.SetDefault("agent.flow.sflow.bind_address", "127.0.0.1")
cfg.SetDefault("agent.flow.sflow.port_min", 6345)
cfg.SetDefault("agent.flow.sflow.port_max", 6355)
Expand Down Expand Up @@ -131,6 +132,7 @@ func init() {

cfg.SetDefault("flow.expire", 600)
cfg.SetDefault("flow.update", 60)
cfg.SetDefault("flow.max_entries", 500000)
cfg.SetDefault("flow.protocol", "udp")
cfg.SetDefault("flow.application_timeout.arp", 10)
cfg.SetDefault("flow.application_timeout.dns", 10)
Expand Down
8 changes: 8 additions & 0 deletions etc/skydive.yml.default
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,11 @@ agent:
# port_min: 6365
# port_max: 6375

ebpf:
# Period in seconds representing the delay between two successive scans
# of the kernel table in ebpf mode for new flow updates. Default interval is 10 seconds
# kernel_scan_interval: 10

capture:
# Period in second to get capture stats from the probe. Note this
# stats_update: 1
Expand Down Expand Up @@ -481,6 +486,9 @@ flow:
# Protocol to use to send flows to the analyzer: websocket or udp
# protocol: udp

# Maximum size of the flow table in userspace
# max_entries: 500000

# Define the layer key mode used by default for captures. The key mode defines
# the layers used to identify a unique flow.
# * L2, this mode includes layer 2 and beyond.
Expand Down
20 changes: 16 additions & 4 deletions flow/probes/ebpf.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (

"github.com/skydive-project/skydive/api/types"
"github.com/skydive-project/skydive/common"
"github.com/skydive-project/skydive/config"
"github.com/skydive-project/skydive/flow"
"github.com/skydive-project/skydive/graffiti/graph"
"github.com/skydive-project/skydive/logging"
Expand All @@ -45,9 +46,7 @@ import (
import "C"

const (
BPF_ANY = 0
FLOW_TABLE_SZ = 500000
ebpfUpdate = 10 * time.Second
BPF_ANY = 0
)

// EBPFProbe the eBPF probe
Expand Down Expand Up @@ -77,6 +76,8 @@ func (p *EBPFProbe) run() {
_, flowEBPFChan, _ := p.flowTable.Start()
defer p.flowTable.Stop()

ebpfUpdate := time.Duration(config.GetConfig().GetInt("agent.flow.ebpf.kernel_scan_interval")) * time.Second
flowTableSz := config.GetConfig().GetInt("flow.max_entries")
var startKTimeNs int64
var start time.Time

Expand All @@ -91,6 +92,17 @@ func (p *EBPFProbe) run() {
for {
select {
case now := <-updateTicker.C:
if statsMap := p.module.Map("stats_map"); statsMap != nil {
var statsKey uint32
var statsVal int64

if p.module.LookupElement(statsMap, unsafe.Pointer(&statsKey), unsafe.Pointer(&statsVal)) == nil {
if statsVal > 0 {
logging.GetLogger().Warningf("flow table overflow, %d flows were dropped from kernel table", statsVal)
}
p.module.DeleteElement(statsMap, unsafe.Pointer(&statsKey))
}
}
// try to get start monotonic time
if startKTimeNs == 0 {
cmap := p.module.Map("u64_config_values")
Expand Down Expand Up @@ -133,7 +145,7 @@ func (p *EBPFProbe) run() {
flowEBPFChan <- &ebpfFlows[nextAvailablePtr]
nextAvailablePtr = (nextAvailablePtr + 1) % flowPoolSize
flowsRead++
if flowsRead == FLOW_TABLE_SZ {
if flowsRead == flowTableSz {
break
}
}
Expand Down
24 changes: 19 additions & 5 deletions flow/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ type Table struct {
flowOpts Opts
appPortMap *ApplicationPortMap
appTimeout map[string]int64
removedFlows int
}

// OperationType operation type of a Flow in a flow table
Expand Down Expand Up @@ -122,7 +123,7 @@ func NewTable(updateHandler *Handler, expireHandler *Handler, nodeTID string, op
// convert seconds to milleseconds
appTimeout[strings.ToUpper(key)] = int64(1000 * config.GetConfig().GetInt("flow.application_timeout."+key))
}
LRU, _ := simplelru.NewLRU(500000, nil)
LRU, _ := simplelru.NewLRU(config.GetConfig().GetInt("flow.max_entries"), nil)
t := &Table{
packetSeqChan: make(chan *PacketSequence, 1000),
flowChanOperation: make(chan *Operation, 1000),
Expand Down Expand Up @@ -152,7 +153,6 @@ func NewTable(updateHandler *Handler, expireHandler *Handler, nodeTID string, op
ExtraLayers: t.Opts.ExtraLayers,
}

t.updateVersion = 0
return t
}

Expand Down Expand Up @@ -198,13 +198,17 @@ func (ft *Table) getOrCreateFlow(key string) (*Flow, bool) {
}

new := NewFlow()
ft.table.Add(key, new)
if ft.table.Add(key, new) {
ft.removedFlows++
}
return new, true
}

func (ft *Table) replaceFlow(key string, f *Flow) *Flow {
prev, _ := ft.table.Get(key)
ft.table.Add(key, f)
if ft.table.Add(key, f) {
ft.removedFlows++
}
if prev == nil {
return nil
}
Expand Down Expand Up @@ -464,7 +468,9 @@ func (ft *Table) processEBPFFlow(ebpfFlow *EBPFFlow, nfl *Flow) {
if !found {
keys, flows := ft.newFlowFromEBPF(ebpfFlow, key)
for i := range keys {
ft.table.Add(keys[i], flows[i])
if ft.table.Add(keys[i], flows[i]) {
ft.removedFlows++
}
}
return
}
Expand Down Expand Up @@ -518,6 +524,9 @@ func (ft *Table) Run() {
nowTicker := time.NewTicker(time.Second * 1)
defer nowTicker.Stop()

overFlowTicker := time.NewTicker(time.Second * 10)
defer overFlowTicker.Stop()

ph := Flow{} // placeholder to avoid memory allocation upon update
ph.TCPMetric = &TCPMetric{}
ph.Metric = &FlowMetric{}
Expand Down Expand Up @@ -552,6 +561,11 @@ func (ft *Table) Run() {
t := now.Add(-ctDuration)
ft.tcpAssembler.FlushOlderThan(t)
ft.ipDefragger.FlushOlderThan(t)
case <-overFlowTicker.C:
if ft.removedFlows > 0 {
logging.GetLogger().Warningf("flow table overflow, %d flows were dropped from userspace table", ft.removedFlows)
ft.removedFlows = 0
}
}
}
}
Expand Down
20 changes: 17 additions & 3 deletions probe/ebpf/flow-gre.c
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ MAP(u64_config_values) {
.max_entries = 1,
};

MAP(stats_map) {
.type = BPF_MAP_TYPE_HASH,
.key_size = sizeof(__u32),
.value_size = sizeof(__u64),
.max_entries = 1,
};

MAP(l2_table) {
.type = BPF_MAP_TYPE_ARRAY,
.key_size = sizeof(__u32),
Expand Down Expand Up @@ -507,8 +514,16 @@ int network_layer(struct __sk_buff *skb)
/* New flow */
new->start = new->last;
update_metrics(skb, new, 1);
bpf_map_update_element(&flow_table, &new->key, new, BPF_ANY);

if (bpf_map_update_element(&flow_table, &new->key, new, BPF_ANY) == -1) {
__u32 stats_key = 0;
__u64 stats_update_val = 1;
__u64 *stats_val = bpf_map_lookup_element(&stats_map,&stats_key);
if (stats_val == NULL) {
bpf_map_update_element(&stats_map, &stats_key, &stats_update_val, BPF_ANY);
} else {
__sync_fetch_and_add(stats_val, stats_update_val);
}
}
return 0;
}

Expand Down Expand Up @@ -545,4 +560,3 @@ int network_layer(struct __sk_buff *skb)
}

char _license[] LICENSE = "GPL";

20 changes: 18 additions & 2 deletions probe/ebpf/flow.c
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ MAP(u64_config_values) {
.max_entries = 1,
};

MAP(stats_map) {
.type = BPF_MAP_TYPE_HASH,
.key_size = sizeof(__u32),
.value_size = sizeof(__u64),
.max_entries = 1,
};

MAP(flow_table) {
.type = BPF_MAP_TYPE_HASH,
.key_size = sizeof(__u64),
Expand Down Expand Up @@ -429,9 +436,18 @@ int bpf_flow_table(struct __sk_buff *skb)
__sync_fetch_and_add(&flow.start, tm);
__sync_fetch_and_add(&flow.last, tm);

bpf_map_update_element(&flow_table, &flow.key, &flow, BPF_ANY);
if (bpf_map_update_element(&flow_table, &flow.key, &flow, BPF_ANY) == -1) {
__u32 stats_key = 0;
__u64 stats_update_val = 1;
__u64 *stats_val = bpf_map_lookup_element(&stats_map,&stats_key);
if (stats_val == NULL) {
bpf_map_update_element(&stats_map, &stats_key, &stats_update_val, BPF_ANY);
} else {
__sync_fetch_and_add(stats_val, stats_update_val);
}
}
}

return 0;
}
char _license[] LICENSE = "GPL";
char _license[] LICENSE = "GPL";

0 comments on commit f2f082a

Please sign in to comment.