Skip to content

Commit

Permalink
Merge pull request kuskoman#137 from kuskoman/fix-plugin-id-collisions
Browse files Browse the repository at this point in the history
Fix plugin_id collisions
  • Loading branch information
kuskoman authored Jun 16, 2023
2 parents 841f280 + 2ebcfe4 commit 79d685a
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 54 deletions.
53 changes: 18 additions & 35 deletions collectors/nodestats/pipeline_subcollector.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package nodestats
import (
"fmt"
"log"
"strings"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -142,7 +141,7 @@ func (collector *PipelineSubcollector) Collect(pipeStats *responses.SinglePipeli

// Output error metrics
for _, output := range pipeStats.Plugins.Outputs {
pluginID := TruncatePluginId(output.ID)
pluginID := output.ID
pluginType := "output"
log.Printf("collecting output error stats for pipeline %s :: plugin type:%s name:%s id:%s", pipelineID, pluginType, output.Name, pluginID)

Expand All @@ -158,39 +157,35 @@ func (collector *PipelineSubcollector) Collect(pipeStats *responses.SinglePipeli

// Pipeline plugins metrics
for _, plugin := range pipeStats.Plugins.Inputs {
pluginID := TruncatePluginId(plugin.ID)
pluginType := "input"
log.Printf("collecting pipeline plugin stats for pipeline %s :: plugin type:%s name:%s id:%s", pipelineID, pluginType, plugin.Name, pluginID)
ch <- prometheus.MustNewConstMetric(collector.PipelinePluginEventsOut, prometheus.CounterValue, float64(plugin.Events.Out), pipelineID, pluginType, plugin.Name, pluginID)
ch <- prometheus.MustNewConstMetric(collector.PipelinePluginEventsQueuePushDuration, prometheus.CounterValue, float64(plugin.Events.QueuePushDurationInMillis), pipelineID, pluginType, plugin.Name, pluginID)
log.Printf("collecting pipeline plugin stats for pipeline %s :: plugin type:%s name:%s id:%s", pipelineID, pluginType, plugin.Name, plugin.ID)
ch <- prometheus.MustNewConstMetric(collector.PipelinePluginEventsOut, prometheus.CounterValue, float64(plugin.Events.Out), pipelineID, pluginType, plugin.Name, plugin.ID)
ch <- prometheus.MustNewConstMetric(collector.PipelinePluginEventsQueuePushDuration, prometheus.CounterValue, float64(plugin.Events.QueuePushDurationInMillis), pipelineID, pluginType, plugin.Name, plugin.ID)
}

for _, plugin := range pipeStats.Plugins.Codecs {
pluginID := TruncatePluginId(plugin.ID)
log.Printf("collecting pipeline plugin stats for pipeline %s :: plugin type:%s name:%s id:%s", pipelineID, "codec", plugin.Name, pluginID)
ch <- prometheus.MustNewConstMetric(collector.PipelinePluginEventsIn, prometheus.CounterValue, float64(plugin.Encode.WritesIn), pipelineID, "codec:encode", plugin.Name, pluginID)
ch <- prometheus.MustNewConstMetric(collector.PipelinePluginEventsIn, prometheus.CounterValue, float64(plugin.Decode.WritesIn), pipelineID, "codec:decode", plugin.Name, pluginID)
ch <- prometheus.MustNewConstMetric(collector.PipelinePluginEventsOut, prometheus.CounterValue, float64(plugin.Decode.Out), pipelineID, "codec:decode", plugin.Name, pluginID)
ch <- prometheus.MustNewConstMetric(collector.PipelinePluginEventsDuration, prometheus.CounterValue, float64(plugin.Encode.DurationInMillis), pipelineID, "codec:encode", plugin.Name, pluginID)
ch <- prometheus.MustNewConstMetric(collector.PipelinePluginEventsDuration, prometheus.CounterValue, float64(plugin.Decode.DurationInMillis), pipelineID, "codec:decode", plugin.Name, pluginID)
log.Printf("collecting pipeline plugin stats for pipeline %s :: plugin type:%s name:%s id:%s", pipelineID, "codec", plugin.Name, plugin.ID)
ch <- prometheus.MustNewConstMetric(collector.PipelinePluginEventsIn, prometheus.CounterValue, float64(plugin.Encode.WritesIn), pipelineID, "codec:encode", plugin.Name, plugin.ID)
ch <- prometheus.MustNewConstMetric(collector.PipelinePluginEventsIn, prometheus.CounterValue, float64(plugin.Decode.WritesIn), pipelineID, "codec:decode", plugin.Name, plugin.ID)
ch <- prometheus.MustNewConstMetric(collector.PipelinePluginEventsOut, prometheus.CounterValue, float64(plugin.Decode.Out), pipelineID, "codec:decode", plugin.Name, plugin.ID)
ch <- prometheus.MustNewConstMetric(collector.PipelinePluginEventsDuration, prometheus.CounterValue, float64(plugin.Encode.DurationInMillis), pipelineID, "codec:encode", plugin.Name, plugin.ID)
ch <- prometheus.MustNewConstMetric(collector.PipelinePluginEventsDuration, prometheus.CounterValue, float64(plugin.Decode.DurationInMillis), pipelineID, "codec:decode", plugin.Name, plugin.ID)
}

for _, plugin := range pipeStats.Plugins.Filters {
pluginID := TruncatePluginId(plugin.ID)
pluginType := "filter"
log.Printf("collecting pipeline plugin stats for pipeline %s :: plugin type:%s name:%s id:%s", pipelineID, pluginType, plugin.Name, pluginID)
ch <- prometheus.MustNewConstMetric(collector.PipelinePluginEventsIn, prometheus.CounterValue, float64(plugin.Events.In), pipelineID, pluginType, plugin.Name, pluginID)
ch <- prometheus.MustNewConstMetric(collector.PipelinePluginEventsOut, prometheus.CounterValue, float64(plugin.Events.Out), pipelineID, pluginType, plugin.Name, pluginID)
ch <- prometheus.MustNewConstMetric(collector.PipelinePluginEventsDuration, prometheus.CounterValue, float64(plugin.Events.DurationInMillis), pipelineID, pluginType, plugin.Name, pluginID)
log.Printf("collecting pipeline plugin stats for pipeline %s :: plugin type:%s name:%s id:%s", pipelineID, pluginType, plugin.Name, plugin.ID)
ch <- prometheus.MustNewConstMetric(collector.PipelinePluginEventsIn, prometheus.CounterValue, float64(plugin.Events.In), pipelineID, pluginType, plugin.Name, plugin.ID)
ch <- prometheus.MustNewConstMetric(collector.PipelinePluginEventsOut, prometheus.CounterValue, float64(plugin.Events.Out), pipelineID, pluginType, plugin.Name, plugin.ID)
ch <- prometheus.MustNewConstMetric(collector.PipelinePluginEventsDuration, prometheus.CounterValue, float64(plugin.Events.DurationInMillis), pipelineID, pluginType, plugin.Name, plugin.ID)
}

for _, plugin := range pipeStats.Plugins.Outputs {
pluginID := TruncatePluginId(plugin.ID)
pluginType := "output"
log.Printf("collecting pipeline plugin stats for pipeline %s :: plugin type:%s name:%s id:%s", pipelineID, pluginType, plugin.Name, pluginID)
ch <- prometheus.MustNewConstMetric(collector.PipelinePluginEventsIn, prometheus.CounterValue, float64(plugin.Events.In), pipelineID, pluginType, plugin.Name, pluginID)
ch <- prometheus.MustNewConstMetric(collector.PipelinePluginEventsOut, prometheus.CounterValue, float64(plugin.Events.Out), pipelineID, pluginType, plugin.Name, pluginID)
ch <- prometheus.MustNewConstMetric(collector.PipelinePluginEventsDuration, prometheus.CounterValue, float64(plugin.Events.DurationInMillis), pipelineID, pluginType, plugin.Name, pluginID)
log.Printf("collecting pipeline plugin stats for pipeline %s :: plugin type:%s name:%s id:%s", pipelineID, pluginType, plugin.Name, plugin.ID)
ch <- prometheus.MustNewConstMetric(collector.PipelinePluginEventsIn, prometheus.CounterValue, float64(plugin.Events.In), pipelineID, pluginType, plugin.Name, plugin.ID)
ch <- prometheus.MustNewConstMetric(collector.PipelinePluginEventsOut, prometheus.CounterValue, float64(plugin.Events.Out), pipelineID, pluginType, plugin.Name, plugin.ID)
ch <- prometheus.MustNewConstMetric(collector.PipelinePluginEventsDuration, prometheus.CounterValue, float64(plugin.Events.DurationInMillis), pipelineID, pluginType, plugin.Name, plugin.ID)
}

collectingEnd := time.Now()
Expand Down Expand Up @@ -227,15 +222,3 @@ func (collector *PipelineSubcollector) isPipelineHealthy(pipeReloadStats respons

return CollectorHealthy
}

// Plugins have non-unique names, so use both name and id as labels
// By default ids are a 36-char UUID, optionally prefixed the a plugin type, or a 64-char SHA256 hash
// If the id is set by the user, keep it. If it's a UUID, truncate it to the last 8 chars (1% chance of collision per 9291)
func TruncatePluginId(pluginID string) string {
// If the pluginId is < 32 chars, it's likely a user-defined id.
if len(pluginID) < 32 {
return pluginID
}
noDashes := strings.Replace(pluginID, "-", "", -1)
return noDashes[len(noDashes)-8:]
}
19 changes: 0 additions & 19 deletions collectors/nodestats/pipeline_subcollector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,22 +80,3 @@ func TestIsPipelineHealthy(t *testing.T) {
})
}
}

func TestTruncatePluginId(t *testing.T) {
testCases := []struct {
input string
output string
}{
{"plain_2c897236-b1fd-42e6-ab7a-f468-b6e6-e404", "b6e6e404"},
{"552b7810244be6259a4cc88fe34833088a23437c5ee9b4c788b2ec4e502c819f", "502c819f"},
{"pipeline_custom_filter_foobar", "pipeline_custom_filter_foobar"},
{"filter_0001", "filter_0001"},
}

for _, tc := range testCases {
got := TruncatePluginId(tc.input)
if got != tc.output {
t.Errorf("TruncatePluginId(%v) = %v; want %v", tc.input, got, tc.output)
}
}
}

0 comments on commit 79d685a

Please sign in to comment.