Skip to content

Commit

Permalink
Refactor handling of MinMax functionality into RunningAggregator
Browse files Browse the repository at this point in the history
allows for easier addition of a sliding window at a later time.

Also makes `period` be a generic argument for all aggregator plugins.
  • Loading branch information
sparrc committed Oct 12, 2016
1 parent ef885ed commit fead808
Show file tree
Hide file tree
Showing 7 changed files with 252 additions and 297 deletions.
60 changes: 28 additions & 32 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,9 +259,6 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) er
for {
select {
case <-shutdown:
for _, agg := range a.Config.Aggregators {
agg.Aggregator.Stop()
}
if len(outMetricC) > 0 {
// keep going until outMetricC is flushed
continue
Expand All @@ -273,7 +270,7 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) er
var dropOriginal bool
if !m.IsAggregate() {
for _, agg := range a.Config.Aggregators {
if ok := agg.Apply(copyMetric(m)); ok {
if ok := agg.Add(copyMetric(m)); ok {
dropOriginal = true
}
}
Expand Down Expand Up @@ -315,22 +312,6 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) er
}
}

func copyMetric(m telegraf.Metric) telegraf.Metric {
t := time.Time(m.Time())

tags := make(map[string]string)
fields := make(map[string]interface{})
for k, v := range m.Tags() {
tags[k] = v
}
for k, v := range m.Fields() {
fields[k] = v
}

out, _ := telegraf.NewMetric(m.Name(), tags, fields, t)
return out
}

// Run runs the agent daemon, gathering every Interval
func (a *Agent) Run(shutdown chan struct{}) error {
var wg sync.WaitGroup
Expand Down Expand Up @@ -367,18 +348,6 @@ func (a *Agent) Run(shutdown chan struct{}) error {
time.Sleep(time.Duration(i - (time.Now().UnixNano() % i)))
}

// Start all Aggregators
for _, aggregator := range a.Config.Aggregators {
acc := NewAccumulator(aggregator, metricC)
acc.SetPrecision(a.Config.Agent.Precision.Duration,
a.Config.Agent.Interval.Duration)
if err := aggregator.Aggregator.Start(acc); err != nil {
log.Printf("[%s] failed to start, exiting\n%s\n",
aggregator.Name(), err.Error())
return err
}
}

wg.Add(1)
go func() {
defer wg.Done()
Expand All @@ -403,6 +372,33 @@ func (a *Agent) Run(shutdown chan struct{}) error {
}(input, interval)
}

wg.Add(len(a.Config.Aggregators))
for _, aggregator := range a.Config.Aggregators {
go func(agg *models.RunningAggregator) {
defer wg.Done()
acc := NewAccumulator(agg, metricC)
acc.SetPrecision(a.Config.Agent.Precision.Duration,
a.Config.Agent.Interval.Duration)
agg.Run(acc, shutdown)
}(aggregator)
}

wg.Wait()
return nil
}

func copyMetric(m telegraf.Metric) telegraf.Metric {
t := time.Time(m.Time())

tags := make(map[string]string)
fields := make(map[string]interface{})
for k, v := range m.Tags() {
tags[k] = v
}
for k, v := range m.Fields() {
fields[k] = v
}

out, _ := telegraf.NewMetric(m.Name(), tags, fields, t)
return out
}
20 changes: 13 additions & 7 deletions aggregator.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
package telegraf

// Aggregator is an interface for implementing an Aggregator plugin.
// the RunningAggregator wraps this interface and guarantees that
// Add, Push, and Reset can not be called concurrently, so locking is not
// required when implementing an Aggregator plugin.
type Aggregator interface {
// SampleConfig returns the default configuration of the Input
// SampleConfig returns the default configuration of the Input.
SampleConfig() string

// Description returns a one-sentence description on the Input
// Description returns a one-sentence description on the Input.
Description() string

// Apply the metric to the aggregator
Apply(in Metric)
// Add the metric to the aggregator.
Add(in Metric)

// Start starts the service filter with the given accumulator
Start(acc Accumulator) error
Stop()
// Push pushes the current aggregates to the accumulator.
Push(acc Accumulator)

// Reset resets the aggregators caches and aggregates.
Reset()
}
40 changes: 32 additions & 8 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -693,7 +693,7 @@ func (c *Config) addAggregator(name string, table *ast.Table) error {
}
aggregator := creator()

aggregatorConfig, err := buildAggregator(name, table)
conf, err := buildAggregator(name, table)
if err != nil {
return err
}
Expand All @@ -702,12 +702,7 @@ func (c *Config) addAggregator(name string, table *ast.Table) error {
return err
}

rf := &models.RunningAggregator{
Aggregator: aggregator,
Config: aggregatorConfig,
}

c.Aggregators = append(c.Aggregators, rf)
c.Aggregators = append(c.Aggregators, models.NewRunningAggregator(aggregator, conf))
return nil
}

Expand Down Expand Up @@ -818,14 +813,41 @@ func (c *Config) addInput(name string, table *ast.Table) error {

// buildAggregator TODO doc
func buildAggregator(name string, tbl *ast.Table) (*models.AggregatorConfig, error) {
conf := &models.AggregatorConfig{Name: name}
unsupportedFields := []string{"tagexclude", "taginclude"}
for _, field := range unsupportedFields {
if _, ok := tbl.Fields[field]; ok {
// TODO raise error because field is not supported
}
}

conf := &models.AggregatorConfig{Name: name}

if node, ok := tbl.Fields["period"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
dur, err := time.ParseDuration(str.Value)
if err != nil {
return nil, err
}

conf.Period = dur
}
}
}

if node, ok := tbl.Fields["delay"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
dur, err := time.ParseDuration(str.Value)
if err != nil {
return nil, err
}

conf.Delay = dur
}
}
}

if node, ok := tbl.Fields["drop_original"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if b, ok := kv.Value.(*ast.Boolean); ok {
Expand Down Expand Up @@ -871,6 +893,8 @@ func buildAggregator(name string, tbl *ast.Table) (*models.AggregatorConfig, err
}
}

delete(tbl.Fields, "period")
delete(tbl.Fields, "delay")
delete(tbl.Fields, "drop_original")
delete(tbl.Fields, "name_prefix")
delete(tbl.Fields, "name_suffix")
Expand Down
69 changes: 64 additions & 5 deletions internal/models/running_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,21 @@ import (
)

type RunningAggregator struct {
Aggregator telegraf.Aggregator
Config *AggregatorConfig
a telegraf.Aggregator
Config *AggregatorConfig

metrics chan telegraf.Metric
}

func NewRunningAggregator(
a telegraf.Aggregator,
conf *AggregatorConfig,
) *RunningAggregator {
return &RunningAggregator{
a: a,
Config: conf,
metrics: make(chan telegraf.Metric, 100),
}
}

// AggregatorConfig containing configuration parameters for the running
Expand All @@ -22,6 +35,9 @@ type AggregatorConfig struct {
MeasurementSuffix string
Tags map[string]string
Filter Filter

Period time.Duration
Delay time.Duration
}

func (r *RunningAggregator) Name() string {
Expand Down Expand Up @@ -56,10 +72,10 @@ func (r *RunningAggregator) MakeMetric(
return m
}

// Apply applies the given metric to the aggregator.
// Add applies the given metric to the aggregator.
// Before applying to the plugin, it will run any defined filters on the metric.
// Apply returns true if the original metric should be dropped.
func (r *RunningAggregator) Apply(in telegraf.Metric) bool {
func (r *RunningAggregator) Add(in telegraf.Metric) bool {
if r.Config.Filter.IsActive() {
// check if the aggregator should apply this metric
name := in.Name()
Expand All @@ -74,6 +90,49 @@ func (r *RunningAggregator) Apply(in telegraf.Metric) bool {
in, _ = telegraf.NewMetric(name, tags, fields, t)
}

r.Aggregator.Apply(in)
r.metrics <- in
return r.Config.DropOriginal
}
func (r *RunningAggregator) add(in telegraf.Metric) {
r.a.Add(in)
}

func (r *RunningAggregator) push(acc telegraf.Accumulator) {
r.a.Push(acc)
}

func (r *RunningAggregator) reset() {
r.a.Reset()
}

func (r *RunningAggregator) Run(
acc telegraf.Accumulator,
shutdown chan struct{},
) {
if r.Config.Delay == 0 {
r.Config.Delay = time.Millisecond * 100
}
if r.Config.Period == 0 {
r.Config.Period = time.Second * 30
}

time.Sleep(r.Config.Delay)
periodT := time.NewTicker(r.Config.Period)
defer periodT.Stop()

for {
select {
case <-shutdown:
if len(r.metrics) > 0 {
// wait until metrics are flushed before exiting
continue
}
return
case m := <-r.metrics:
r.add(m)
case <-periodT.C:
r.push(acc)
r.reset()
}
}
}
Loading

0 comments on commit fead808

Please sign in to comment.