Skip to content

Commit

Permalink
customize flow timeout by application
Browse files Browse the repository at this point in the history
  • Loading branch information
orozery committed Mar 4, 2019
1 parent bf87f6a commit a85d69e
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 0 deletions.
2 changes: 2 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ func init() {
cfg.SetDefault("flow.expire", 600)
cfg.SetDefault("flow.update", 60)
cfg.SetDefault("flow.protocol", "udp")
cfg.SetDefault("flow.application_timeout.arp", 10)
cfg.SetDefault("flow.application_timeout.dns", 10)

cfg.SetDefault("host_id", host)

Expand Down
6 changes: 6 additions & 0 deletions etc/skydive.yml.default
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,12 @@ flow:
udp:
# 1194: OPENVPN

# application specific flow timeout, in seconds
# this timeout is enforced in addition to the general flow.expire timeout
application_timeout:
# - arp: 10
# - dns: 10

ui:
# Specify the extra assets folder. Javascript and CSS files present in this
# folder will be added to the WebUI.
Expand Down
14 changes: 14 additions & 0 deletions flow/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package flow

import (
"strings"
"sync"
"sync/atomic"
"time"
Expand All @@ -26,6 +27,7 @@ import (
"github.com/google/gopacket/layers"

"github.com/skydive-project/skydive/common"
"github.com/skydive-project/skydive/config"
"github.com/skydive-project/skydive/filters"
"github.com/skydive-project/skydive/logging"
)
Expand Down Expand Up @@ -81,6 +83,7 @@ type Table struct {
tcpAssembler *TCPAssembler
flowOpts Opts
appPortMap *ApplicationPortMap
appTimeout map[string]int64
}

// OperationType operation type of a Flow in a flow table
Expand All @@ -102,6 +105,12 @@ type Operation struct {

// NewTable creates a new flow table
func NewTable(updateHandler *Handler, expireHandler *Handler, nodeTID string, opts ...TableOpts) *Table {
appTimeout := make(map[string]int64)
for key := range config.GetConfig().GetStringMap("flow.application_timeout") {
// convert seconds to milleseconds
appTimeout[strings.ToUpper(key)] = int64(1000 * config.GetConfig().GetInt("flow.application_timeout."+key))
}

t := &Table{
packetSeqChan: make(chan *PacketSequence, 1000),
flowChanOperation: make(chan *Operation, 1000),
Expand All @@ -116,6 +125,7 @@ func NewTable(updateHandler *Handler, expireHandler *Handler, nodeTID string, op
ipDefragger: NewIPDefragger(),
tcpAssembler: NewTCPAssembler(),
appPortMap: NewApplicationPortMapFromConfig(),
appTimeout: appTimeout,
}
if len(opts) > 0 {
t.Opts = opts[0]
Expand Down Expand Up @@ -252,6 +262,10 @@ func (ft *Table) update(updateFrom, updateTime int64) {
if f.FinishType != FlowFinishType_NOT_FINISHED {
delete(ft.table, k)
}
} else if updateTime-f.Last > ft.appTimeout[f.Application] && ft.appTimeout[f.Application] > 0 {
updatedFlows = append(updatedFlows, f)
f.FinishType = FlowFinishType_TIMEOUT
delete(ft.table, k)
} else {
f.LastUpdateMetric = &FlowMetric{Start: updateFrom, Last: updateTime}
}
Expand Down
35 changes: 35 additions & 0 deletions flow/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/google/gopacket/layers"
"github.com/skydive-project/skydive/common"
"github.com/skydive-project/skydive/config"
"github.com/skydive-project/skydive/filters"
)

Expand Down Expand Up @@ -202,3 +203,37 @@ func TestUpdate(t *testing.T) {
t.Errorf("Should have been notified : %+v", flow2)
}
}

func TestAppSpecificTimeout(t *testing.T) {
var received int
callback := func(f *FlowArray) {
received += len(f.Flows)
}
updHandler := NewFlowHandler(callback, time.Second)
expHandler := NewFlowHandler(func(f *FlowArray) {}, 300*time.Second)

config.GetConfig().Set("flow.application_timeout.arp", 10)
config.GetConfig().Set("flow.application_timeout.dns", 20)

table := NewTable(updHandler, expHandler, "", TableOpts{})

flowsTime := time.Now()

arpFlow, _ := table.getOrCreateFlow("arpFlow")
arpFlow.Last = common.UnixMillis(flowsTime)
arpFlow.Application = "ARP"

dnsFlow, _ := table.getOrCreateFlow("dnsFlow")
dnsFlow.Last = common.UnixMillis(flowsTime)
dnsFlow.Application = "DNS"

table.updateAt(flowsTime.Add(time.Duration(15) * time.Second))

if received == 0 || arpFlow.FinishType != FlowFinishType_TIMEOUT {
t.Errorf("Should have been notified : %+v", arpFlow)
}

if received > 1 || dnsFlow.FinishType != FlowFinishType_NOT_FINISHED {
t.Errorf("Should not have been notified : %+v", dnsFlow)
}
}

0 comments on commit a85d69e

Please sign in to comment.