Skip to content

Commit

Permalink
add support for streaming processors (influxdata#7634)
Browse files Browse the repository at this point in the history
  • Loading branch information
ssoroka authored Jun 5, 2020
1 parent b99e3bc commit 741ea83
Show file tree
Hide file tree
Showing 12 changed files with 833 additions and 499 deletions.
1,012 changes: 611 additions & 401 deletions agent/agent.go

Large diffs are not rendered by default.

6 changes: 1 addition & 5 deletions aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,7 @@ package telegraf
// Add, Push, and Reset can not be called concurrently, so locking is not
// required when implementing an Aggregator plugin.
type Aggregator interface {
// SampleConfig returns the default configuration of the Input.
SampleConfig() string

// Description returns a one-sentence description on the Input.
Description() string
PluginDescriber

// Add the metric to the aggregator.
Add(in Metric)
Expand Down
107 changes: 72 additions & 35 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ type Config struct {
Outputs []*models.RunningOutput
Aggregators []*models.RunningAggregator
// Processors have a slice wrapper type because they need to be sorted
Processors models.RunningProcessors
Processors models.RunningProcessors
AggProcessors models.RunningProcessors
}

func NewConfig() *Config {
Expand All @@ -83,6 +84,7 @@ func NewConfig() *Config {
Inputs: make([]*models.RunningInput, 0),
Outputs: make([]*models.RunningOutput, 0),
Processors: make([]*models.RunningProcessor, 0),
AggProcessors: make([]*models.RunningProcessor, 0),
InputFilters: make([]string, 0),
OutputFilters: make([]string, 0),
}
Expand Down Expand Up @@ -561,12 +563,7 @@ func printFilteredGlobalSections(sectionFilters []string) {
}
}

type printer interface {
Description() string
SampleConfig() string
}

func printConfig(name string, p printer, op string, commented bool) {
func printConfig(name string, p telegraf.PluginDescriber, op string, commented bool) {
comment := ""
if commented {
comment = "# "
Expand Down Expand Up @@ -684,24 +681,31 @@ func (c *Config) LoadConfig(path string) error {
}
data, err := loadConfig(path)
if err != nil {
return fmt.Errorf("Error loading %s, %s", path, err)
return fmt.Errorf("Error loading config file %s: %w", path, err)
}

if err = c.LoadConfigData(data); err != nil {
return fmt.Errorf("Error loading config file %s: %w", path, err)
}
return nil
}

// LoadConfigData loads TOML-formatted config data
func (c *Config) LoadConfigData(data []byte) error {
tbl, err := parseConfig(data)
if err != nil {
return fmt.Errorf("Error parsing %s, %s", path, err)
return fmt.Errorf("Error parsing data: %s", err)
}

// Parse tags tables first:
for _, tableName := range []string{"tags", "global_tags"} {
if val, ok := tbl.Fields[tableName]; ok {
subTable, ok := val.(*ast.Table)
if !ok {
return fmt.Errorf("%s: invalid configuration", path)
return fmt.Errorf("invalid configuration, bad table name %q", tableName)
}
if err = toml.UnmarshalTable(subTable, c.Tags); err != nil {
log.Printf("E! Could not parse [global_tags] config\n")
return fmt.Errorf("Error parsing %s, %s", path, err)
return fmt.Errorf("error parsing table name %q: %w", tableName, err)
}
}
}
Expand All @@ -710,11 +714,10 @@ func (c *Config) LoadConfig(path string) error {
if val, ok := tbl.Fields["agent"]; ok {
subTable, ok := val.(*ast.Table)
if !ok {
return fmt.Errorf("%s: invalid configuration", path)
return fmt.Errorf("invalid configuration, error parsing agent table")
}
if err = toml.UnmarshalTable(subTable, c.Agent); err != nil {
log.Printf("E! Could not parse [agent] config\n")
return fmt.Errorf("Error parsing %s, %s", path, err)
return fmt.Errorf("error parsing agent table: %w", err)
}
}

Expand All @@ -735,7 +738,7 @@ func (c *Config) LoadConfig(path string) error {
for name, val := range tbl.Fields {
subTable, ok := val.(*ast.Table)
if !ok {
return fmt.Errorf("%s: invalid configuration", path)
return fmt.Errorf("invalid configuration, error parsing field %q as table", name)
}

switch name {
Expand All @@ -746,17 +749,17 @@ func (c *Config) LoadConfig(path string) error {
// legacy [outputs.influxdb] support
case *ast.Table:
if err = c.addOutput(pluginName, pluginSubTable); err != nil {
return fmt.Errorf("Error parsing %s, %s", path, err)
return fmt.Errorf("Error parsing %s, %s", pluginName, err)
}
case []*ast.Table:
for _, t := range pluginSubTable {
if err = c.addOutput(pluginName, t); err != nil {
return fmt.Errorf("Error parsing %s, %s", path, err)
return fmt.Errorf("Error parsing %s array, %s", pluginName, err)
}
}
default:
return fmt.Errorf("Unsupported config format: %s, file %s",
pluginName, path)
return fmt.Errorf("Unsupported config format: %s",
pluginName)
}
}
case "inputs", "plugins":
Expand All @@ -765,17 +768,17 @@ func (c *Config) LoadConfig(path string) error {
// legacy [inputs.cpu] support
case *ast.Table:
if err = c.addInput(pluginName, pluginSubTable); err != nil {
return fmt.Errorf("Error parsing %s, %s", path, err)
return fmt.Errorf("Error parsing %s, %s", pluginName, err)
}
case []*ast.Table:
for _, t := range pluginSubTable {
if err = c.addInput(pluginName, t); err != nil {
return fmt.Errorf("Error parsing %s, %s", path, err)
return fmt.Errorf("Error parsing %s, %s", pluginName, err)
}
}
default:
return fmt.Errorf("Unsupported config format: %s, file %s",
pluginName, path)
return fmt.Errorf("Unsupported config format: %s",
pluginName)
}
}
case "processors":
Expand All @@ -784,12 +787,12 @@ func (c *Config) LoadConfig(path string) error {
case []*ast.Table:
for _, t := range pluginSubTable {
if err = c.addProcessor(pluginName, t); err != nil {
return fmt.Errorf("Error parsing %s, %s", path, err)
return fmt.Errorf("Error parsing %s, %s", pluginName, err)
}
}
default:
return fmt.Errorf("Unsupported config format: %s, file %s",
pluginName, path)
return fmt.Errorf("Unsupported config format: %s",
pluginName)
}
}
case "aggregators":
Expand All @@ -798,19 +801,19 @@ func (c *Config) LoadConfig(path string) error {
case []*ast.Table:
for _, t := range pluginSubTable {
if err = c.addAggregator(pluginName, t); err != nil {
return fmt.Errorf("Error parsing %s, %s", path, err)
return fmt.Errorf("Error parsing %s, %s", pluginName, err)
}
}
default:
return fmt.Errorf("Unsupported config format: %s, file %s",
pluginName, path)
return fmt.Errorf("Unsupported config format: %s",
pluginName)
}
}
// Assume it's an input input for legacy config file support if no other
// identifiers are present
default:
if err = c.addInput(name, subTable); err != nil {
return fmt.Errorf("Error parsing %s, %s", path, err)
return fmt.Errorf("Error parsing %s, %s", name, err)
}
}
}
Expand Down Expand Up @@ -929,23 +932,50 @@ func (c *Config) addProcessor(name string, table *ast.Table) error {
if !ok {
return fmt.Errorf("Undefined but requested processor: %s", name)
}
processor := creator()

processorConfig, err := buildProcessor(name, table)
if err != nil {
return err
}

if err := toml.UnmarshalTable(table, processor); err != nil {
rf, err := c.newRunningProcessor(creator, processorConfig, name, table)
if err != nil {
return err
}
c.Processors = append(c.Processors, rf)

rf := models.NewRunningProcessor(processor, processorConfig)
// save a copy for the aggregator
rf, err = c.newRunningProcessor(creator, processorConfig, name, table)
if err != nil {
return err
}
c.AggProcessors = append(c.AggProcessors, rf)

c.Processors = append(c.Processors, rf)
return nil
}

func (c *Config) newRunningProcessor(
creator processors.StreamingCreator,
processorConfig *models.ProcessorConfig,
name string,
table *ast.Table,
) (*models.RunningProcessor, error) {
processor := creator()

if p, ok := processor.(unwrappable); ok {
if err := toml.UnmarshalTable(table, p.Unwrap()); err != nil {
return nil, err
}
} else {
if err := toml.UnmarshalTable(table, processor); err != nil {
return nil, err
}
}

rf := models.NewRunningProcessor(processor, processorConfig)
return rf, nil
}

func (c *Config) addOutput(name string, table *ast.Table) error {
if len(c.OutputFilters) > 0 && !sliceContains(name, c.OutputFilters) {
return nil
Expand Down Expand Up @@ -2195,3 +2225,10 @@ func buildOutput(name string, tbl *ast.Table) (*models.OutputConfig, error) {

return oc, nil
}

// unwrappable lets you retrieve the original telegraf.Processor from the
// StreamingProcessor. This is necessary because the toml Unmarshaller won't
// look inside composed types.
type unwrappable interface {
Unwrap() telegraf.Processor
}
8 changes: 4 additions & 4 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,20 +207,20 @@ func TestConfig_FieldNotDefined(t *testing.T) {
c := NewConfig()
err := c.LoadConfig("./testdata/invalid_field.toml")
require.Error(t, err, "invalid field name")
assert.Equal(t, "Error parsing ./testdata/invalid_field.toml, line 2: field corresponding to `not_a_field' is not defined in http_listener_v2.HTTPListenerV2", err.Error())
assert.Equal(t, "Error loading config file ./testdata/invalid_field.toml: Error parsing http_listener_v2, line 2: field corresponding to `not_a_field' is not defined in http_listener_v2.HTTPListenerV2", err.Error())

}

func TestConfig_WrongFieldType(t *testing.T) {
c := NewConfig()
err := c.LoadConfig("./testdata/wrong_field_type.toml")
require.Error(t, err, "invalid field type")
assert.Equal(t, "Error parsing ./testdata/wrong_field_type.toml, line 2: (http_listener_v2.HTTPListenerV2.Port) cannot unmarshal TOML string into int", err.Error())
assert.Equal(t, "Error loading config file ./testdata/wrong_field_type.toml: Error parsing http_listener_v2, line 2: (http_listener_v2.HTTPListenerV2.Port) cannot unmarshal TOML string into int", err.Error())

c = NewConfig()
err = c.LoadConfig("./testdata/wrong_field_type2.toml")
require.Error(t, err, "invalid field type2")
assert.Equal(t, "Error parsing ./testdata/wrong_field_type2.toml, line 2: (http_listener_v2.HTTPListenerV2.Methods) cannot unmarshal TOML string into []string", err.Error())
assert.Equal(t, "Error loading config file ./testdata/wrong_field_type2.toml: Error parsing http_listener_v2, line 2: (http_listener_v2.HTTPListenerV2.Methods) cannot unmarshal TOML string into []string", err.Error())
}

func TestConfig_InlineTables(t *testing.T) {
Expand Down Expand Up @@ -255,5 +255,5 @@ func TestConfig_BadOrdering(t *testing.T) {
c := NewConfig()
err := c.LoadConfig("./testdata/non_slice_slice.toml")
require.Error(t, err, "bad ordering")
assert.Equal(t, "Error parsing ./testdata/non_slice_slice.toml, line 4: cannot unmarshal TOML array into string (need slice)", err.Error())
assert.Equal(t, "Error loading config file ./testdata/non_slice_slice.toml: Error parsing http array, line 4: cannot unmarshal TOML array into string (need slice)", err.Error())
}
6 changes: 1 addition & 5 deletions input.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
package telegraf

type Input interface {
// SampleConfig returns the default configuration of the Input
SampleConfig() string

// Description returns a one-sentence description on the Input
Description() string
PluginDescriber

// Gather takes in an accumulator and adds the metrics that the Input
// gathers. This is called every "interval"
Expand Down
62 changes: 29 additions & 33 deletions models/running_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
type RunningProcessor struct {
sync.Mutex
log telegraf.Logger
Processor telegraf.Processor
Processor telegraf.StreamingProcessor
Config *ProcessorConfig
}

Expand All @@ -28,7 +28,7 @@ type ProcessorConfig struct {
Filter Filter
}

func NewRunningProcessor(processor telegraf.Processor, config *ProcessorConfig) *RunningProcessor {
func NewRunningProcessor(processor telegraf.StreamingProcessor, config *ProcessorConfig) *RunningProcessor {
tags := map[string]string{"processor": config.Name}
if config.Alias != "" {
tags["alias"] = config.Alias
Expand All @@ -52,15 +52,6 @@ func (rp *RunningProcessor) metricFiltered(metric telegraf.Metric) {
metric.Drop()
}

func containsMetric(item telegraf.Metric, metrics []telegraf.Metric) bool {
for _, m := range metrics {
if item == m {
return true
}
}
return false
}

func (r *RunningProcessor) Init() error {
if p, ok := r.Processor.(telegraf.Initializer); ok {
err := p.Init()
Expand All @@ -71,34 +62,39 @@ func (r *RunningProcessor) Init() error {
return nil
}

func (rp *RunningProcessor) Apply(in ...telegraf.Metric) []telegraf.Metric {
rp.Lock()
defer rp.Unlock()
func (r *RunningProcessor) Log() telegraf.Logger {
return r.log
}

func (r *RunningProcessor) LogName() string {
return logName("processors", r.Config.Name, r.Config.Alias)
}

ret := []telegraf.Metric{}
func (r *RunningProcessor) MakeMetric(metric telegraf.Metric) telegraf.Metric {
return metric
}

for _, metric := range in {
// In processors when a filter selects a metric it is sent through the
// processor. Otherwise the metric continues downstream unmodified.
if ok := rp.Config.Filter.Select(metric); !ok {
ret = append(ret, metric)
continue
}
func (r *RunningProcessor) Start(acc telegraf.Accumulator) error {
return r.Processor.Start(acc)
}

rp.Config.Filter.Modify(metric)
if len(metric.FieldList()) == 0 {
rp.metricFiltered(metric)
continue
}
func (r *RunningProcessor) Add(m telegraf.Metric, acc telegraf.Accumulator) {
if ok := r.Config.Filter.Select(m); !ok {
// pass downstream
acc.AddMetric(m)
return
}

// This metric should pass through the filter, so call the filter Apply
// function and append results to the output slice.
ret = append(ret, rp.Processor.Apply(metric)...)
r.Config.Filter.Modify(m)
if len(m.FieldList()) == 0 {
// drop metric
r.metricFiltered(m)
return
}

return ret
r.Processor.Add(m, acc)
}

func (r *RunningProcessor) Log() telegraf.Logger {
return r.log
func (r *RunningProcessor) Stop() {
r.Processor.Stop()
}
Loading

0 comments on commit 741ea83

Please sign in to comment.