Skip to content

Commit

Permalink
Add FilterCost to health info
Browse files Browse the repository at this point in the history
  • Loading branch information
hit9 committed Mar 3, 2016
1 parent 1c0e88a commit b64a5fa
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 11 deletions.
15 changes: 9 additions & 6 deletions detector/detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,16 @@ import (
"fmt"
"net"
"path/filepath"
"time"

"github.com/eleme/banshee/config"
"github.com/eleme/banshee/filter"
"github.com/eleme/banshee/health"
"github.com/eleme/banshee/models"
"github.com/eleme/banshee/storage"
"github.com/eleme/banshee/storage/indexdb"
"github.com/eleme/banshee/util"
"github.com/eleme/banshee/util/log"
"github.com/eleme/banshee/util/mathutil"
)

// Timeout in milliseconds.
Expand Down Expand Up @@ -125,8 +126,7 @@ func (d *Detector) handle(conn net.Conn) {
//
func (d *Detector) process(m *models.Metric) {
health.IncrNumMetricIncomed(1)
// Time it.
startAt := time.Now()
timer := util.NewTimer()
// Match
ok, rules := d.match(m)
if !ok {
Expand All @@ -146,7 +146,7 @@ func (d *Detector) process(m *models.Metric) {
d.output(m)
}
// Time end.
elapsed := float64(time.Since(startAt).Nanoseconds()) / float64(1000*1000)
elapsed := timer.Elapsed()
if elapsed > timeout {
log.Warn("detection is slow: %.2fms", elapsed)
}
Expand All @@ -161,7 +161,10 @@ func (d *Detector) process(m *models.Metric) {
//
func (d *Detector) match(m *models.Metric) (bool, []*models.Rule) {
// Check rules.
timer := util.NewTimer()
rules := d.flt.MatchedRules(m)
elapsed := timer.Elapsed()
health.AddFilterCost(elapsed)
if len(rules) == 0 {
// Hit no rules.
return false, rules
Expand Down Expand Up @@ -382,8 +385,8 @@ func (d *Detector) div3Sigma(m *models.Metric, vals []float64) {
return
}
// Values average and standard deviation.
avg := average(vals)
std := stdDev(vals, avg)
avg := mathutil.Average(vals)
std := mathutil.StdDev(vals, avg)
// Set metric average
m.Average = avg
// Set metric score
Expand Down
34 changes: 29 additions & 5 deletions health/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package health

import (
"github.com/eleme/banshee/storage"
"github.com/eleme/banshee/util/mathutil"
"sync"
"sync/atomic"
"time"
Expand All @@ -15,6 +16,9 @@ const AggregationInterval int = 5 * 60
// max length for detectionCosts.
const maxDetectionCostsLen = 100 * 1024

// max length for filterCosets.
const maxFilterCostsLen = 100 * 1024

// Info is the stats container.
type Info struct {
lock sync.RWMutex
Expand All @@ -24,6 +28,7 @@ type Info struct {
NumClients int64 `json:"numClients"`
// Aggregation
DetectionCost float64 `json:"detectionCost"` // ms
FilterCost float64 `json:"filterCost"` // ms
NumMetricIncomed int64 `json:"numMetricIncomed"`
NumMetricDetected int64 `json:"numMetricDetected"`
NumAlertingEvents int64 `json:"numAlertingEvents"`
Expand All @@ -38,6 +43,7 @@ func (info *Info) copy() *Info {
NumIndexTotal: info.NumIndexTotal,
NumClients: info.NumClients,
DetectionCost: info.DetectionCost,
FilterCost: info.FilterCost,
NumMetricIncomed: info.NumMetricIncomed,
NumMetricDetected: info.NumMetricDetected,
NumAlertingEvents: info.NumAlertingEvents,
Expand All @@ -55,6 +61,8 @@ type hub struct {
// Aggregation
detectionCosts []float64
detectionCostsLock sync.Mutex
filterCosts []float64
filterCostsLock sync.Mutex
numMetricIncomed int64
numMetricDetected int64
numAlertingEvents int64
Expand Down Expand Up @@ -92,6 +100,15 @@ func AddDetectionCost(n float64) {
}
}

// AddFilterCost appends cost to FilterCosts.
func AddFilterCost(n float64) {
h.filterCostsLock.Lock()
defer h.filterCostsLock.Unlock()
if len(h.filterCosts) < maxFilterCostsLen {
h.filterCosts = append(h.filterCosts, n)
}
}

// IncrNumMetricIncomed increments NumMetricIncomed by n.
func IncrNumMetricIncomed(n int64) {
atomic.AddInt64(&h.numMetricIncomed, n)
Expand Down Expand Up @@ -127,14 +144,20 @@ func aggregateDetectionCost() {
defer h.info.lock.Unlock()
h.detectionCostsLock.Lock()
defer h.detectionCostsLock.Unlock()
var sum float64
for i := 0; i < len(h.detectionCosts); i++ {
sum += h.detectionCosts[i]
}
h.info.DetectionCost = sum / float64(len(h.detectionCosts))
h.info.DetectionCost = mathutil.Average(h.detectionCosts)
h.detectionCosts = h.detectionCosts[:0]
}

// Aggregate FilterCost.
func aggregationFilterCost() {
h.info.lock.Lock()
defer h.info.lock.Unlock()
h.filterCostsLock.Lock()
defer h.filterCostsLock.Unlock()
h.info.FilterCost = mathutil.Average(h.filterCosts)
h.filterCosts = h.filterCosts[:0]
}

// Aggregate NumMetricIncomed.
func aggregateNumMetricIncomed() {
h.info.lock.Lock()
Expand Down Expand Up @@ -171,5 +194,6 @@ func Start() {
aggregateNumMetricIncomed()
aggregateNumMetricDetected()
aggregateNumAlertingEvents()
aggregationFilterCost()
}
}

0 comments on commit b64a5fa

Please sign in to comment.