From b64a5fa23ea26f06a0018337bd7524b6dd70c219 Mon Sep 17 00:00:00 2001 From: hit9 Date: Thu, 3 Mar 2016 22:00:57 +0800 Subject: [PATCH] Add FilterCost to health info --- detector/detector.go | 15 +++++++++------ health/health.go | 34 +++++++++++++++++++++++++++++----- 2 files changed, 38 insertions(+), 11 deletions(-) diff --git a/detector/detector.go b/detector/detector.go index 1ac48f9..748352f 100644 --- a/detector/detector.go +++ b/detector/detector.go @@ -7,7 +7,6 @@ import ( "fmt" "net" "path/filepath" - "time" "github.com/eleme/banshee/config" "github.com/eleme/banshee/filter" @@ -15,7 +14,9 @@ import ( "github.com/eleme/banshee/models" "github.com/eleme/banshee/storage" "github.com/eleme/banshee/storage/indexdb" + "github.com/eleme/banshee/util" "github.com/eleme/banshee/util/log" + "github.com/eleme/banshee/util/mathutil" ) // Timeout in milliseconds. @@ -125,8 +126,7 @@ func (d *Detector) handle(conn net.Conn) { // func (d *Detector) process(m *models.Metric) { health.IncrNumMetricIncomed(1) - // Time it. - startAt := time.Now() + timer := util.NewTimer() // Match ok, rules := d.match(m) if !ok { @@ -146,7 +146,7 @@ func (d *Detector) process(m *models.Metric) { d.output(m) } // Time end. - elapsed := float64(time.Since(startAt).Nanoseconds()) / float64(1000*1000) + elapsed := timer.Elapsed() if elapsed > timeout { log.Warn("detection is slow: %.2fms", elapsed) } @@ -161,7 +161,10 @@ func (d *Detector) process(m *models.Metric) { // func (d *Detector) match(m *models.Metric) (bool, []*models.Rule) { // Check rules. + timer := util.NewTimer() rules := d.flt.MatchedRules(m) + elapsed := timer.Elapsed() + health.AddFilterCost(elapsed) if len(rules) == 0 { // Hit no rules. return false, rules @@ -382,8 +385,8 @@ func (d *Detector) div3Sigma(m *models.Metric, vals []float64) { return } // Values average and standard deviation. - avg := average(vals) - std := stdDev(vals, avg) + avg := mathutil.Average(vals) + std := mathutil.StdDev(vals, avg) // Set metric average m.Average = avg // Set metric score diff --git a/health/health.go b/health/health.go index 7e91a35..9856239 100644 --- a/health/health.go +++ b/health/health.go @@ -4,6 +4,7 @@ package health import ( "github.com/eleme/banshee/storage" + "github.com/eleme/banshee/util/mathutil" "sync" "sync/atomic" "time" @@ -15,6 +16,9 @@ const AggregationInterval int = 5 * 60 // max length for detectionCosts. const maxDetectionCostsLen = 100 * 1024 +// max length for filterCosets. +const maxFilterCostsLen = 100 * 1024 + // Info is the stats container. type Info struct { lock sync.RWMutex @@ -24,6 +28,7 @@ type Info struct { NumClients int64 `json:"numClients"` // Aggregation DetectionCost float64 `json:"detectionCost"` // ms + FilterCost float64 `json:"filterCost"` // ms NumMetricIncomed int64 `json:"numMetricIncomed"` NumMetricDetected int64 `json:"numMetricDetected"` NumAlertingEvents int64 `json:"numAlertingEvents"` @@ -38,6 +43,7 @@ func (info *Info) copy() *Info { NumIndexTotal: info.NumIndexTotal, NumClients: info.NumClients, DetectionCost: info.DetectionCost, + FilterCost: info.FilterCost, NumMetricIncomed: info.NumMetricIncomed, NumMetricDetected: info.NumMetricDetected, NumAlertingEvents: info.NumAlertingEvents, @@ -55,6 +61,8 @@ type hub struct { // Aggregation detectionCosts []float64 detectionCostsLock sync.Mutex + filterCosts []float64 + filterCostsLock sync.Mutex numMetricIncomed int64 numMetricDetected int64 numAlertingEvents int64 @@ -92,6 +100,15 @@ func AddDetectionCost(n float64) { } } +// AddFilterCost appends cost to FilterCosts. +func AddFilterCost(n float64) { + h.filterCostsLock.Lock() + defer h.filterCostsLock.Unlock() + if len(h.filterCosts) < maxFilterCostsLen { + h.filterCosts = append(h.filterCosts, n) + } +} + // IncrNumMetricIncomed increments NumMetricIncomed by n. func IncrNumMetricIncomed(n int64) { atomic.AddInt64(&h.numMetricIncomed, n) @@ -127,14 +144,20 @@ func aggregateDetectionCost() { defer h.info.lock.Unlock() h.detectionCostsLock.Lock() defer h.detectionCostsLock.Unlock() - var sum float64 - for i := 0; i < len(h.detectionCosts); i++ { - sum += h.detectionCosts[i] - } - h.info.DetectionCost = sum / float64(len(h.detectionCosts)) + h.info.DetectionCost = mathutil.Average(h.detectionCosts) h.detectionCosts = h.detectionCosts[:0] } +// Aggregate FilterCost. +func aggregationFilterCost() { + h.info.lock.Lock() + defer h.info.lock.Unlock() + h.filterCostsLock.Lock() + defer h.filterCostsLock.Unlock() + h.info.FilterCost = mathutil.Average(h.filterCosts) + h.filterCosts = h.filterCosts[:0] +} + // Aggregate NumMetricIncomed. func aggregateNumMetricIncomed() { h.info.lock.Lock() @@ -171,5 +194,6 @@ func Start() { aggregateNumMetricIncomed() aggregateNumMetricDetected() aggregateNumAlertingEvents() + aggregationFilterCost() } }