Skip to content

Commit

Permalink
Add plugin input filter output metrics (kuskoman#101)
Browse files Browse the repository at this point in the history
* Makes NewDescWithHelpAndLabels() variadic for labels

* Rename NewDescWithHelpAndLabel()

* Adds pipeline input/filters/output metrics

* Adds duplicate filters to logstash.conf, node_stats fixture

* Fixes duplicate plugin-name by adding id

* Updates snaps with added duplicate plugin

* Update snapshot, expected test metric names

* Cleans up plugin metrics, using more labels

* Adds ids to logstash.conf filters

* Fix duped _pipelines in metric name

* Added missing reload timestamps to README
  • Loading branch information
excalq authored Apr 26, 2023
1 parent 5683d2a commit f94dbb0
Show file tree
Hide file tree
Showing 10 changed files with 169 additions and 23 deletions.
27 changes: 27 additions & 0 deletions .docker/logstash.conf
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,40 @@ input {
type => "dummy"
count => -1
message => '{"message": "dummy log"}'
codec => plain {id => "plain-codec-001"}
}
}

filter {
json {
id => "json-filter"
source => "message"
}
# There are too many of these. Drop 80% of them.
if [message][message] == "dummy log" {
drop {
id => "drop-80-percent"
percentage => 80
}
}
if [massage][non_existent] {
drop {
id => "drop-non-existent"
}
}
mutate {
id => "mutate-path-001"
gsub => [
"[url][path]", "/([^/]+).*", "\1"
]
add_field => { "[oject][foo]" => "bar" }
}

# Don't keep this duplicate payload
prune {
id => "prune-http-input-fields"
blacklist_names => [ "event", "host", "http", "url", "user_agent" ]
}
}

output {
Expand Down
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -300,14 +300,17 @@ Table of exported metrics:
| logstash_stats_pipeline_events_in | counter | Number of events that have been inputted into this pipeline. |
| logstash_stats_pipeline_events_out | counter | Number of events that have been processed by this pipeline. |
| logstash_stats_pipeline_events_queue_push_duration | counter | Time needed to push event to queue. |
| logstash_stats_pipeline_plugin_events_duration | counter | Time spent processing events in this plugin. |
| logstash_stats_pipeline_plugin_events_in | counter | Number of events received this pipeline. |
| logstash_stats_pipeline_plugin_events_out | counter | Number of events output by this pipeline. |
| logstash_stats_pipeline_plugin_events_queue_push_duration | counter | Time spent pushing events into the input queue. |
| logstash_stats_pipeline_queue_events_count | counter | Number of events in the queue. |
| logstash_stats_pipeline_queue_events_queue_size | counter | Number of events that the queue can accommodate |
| logstash_stats_pipeline_queue_max_size_in_bytes | counter | Maximum size of given queue in bytes. |
| logstash_stats_pipeline_reloads_failures | counter | Number of failed pipeline reloads. |
| logstash_stats_pipeline_reloads_successes | counter | Number of successful pipeline reloads. |
| logstash_stats_pipeline_reloads_last_failure_timestamp | gauge | Timestamp of last failed pipeline reload. |
| logstash_stats_pipeline_reloads_last_success_timestamp | gauge | Timestamp of last successful pipeline reload. |
| logstash_stats_pipeline_reloads_successes | counter | Number of successful pipeline reloads. |
| logstash_stats_pipeline_up | gauge | Whether the pipeline is up or not. |
| logstash_stats_process_cpu_load_average_1m | gauge | Total 1m system load average. |
| logstash_stats_process_cpu_load_average_5m | gauge | Total 5m system load average. |
| logstash_stats_process_cpu_load_average_15m | gauge | Total 15m system load average. |
Expand Down
10 changes: 5 additions & 5 deletions collectors/nodestats/nodestats_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,15 @@ func NewNodestatsCollector(client logstashclient.Client) *NodestatsCollector {
JvmMemHeapUsedBytes: descHelper.NewDescWithHelp("jvm_mem_heap_used_bytes", "Amount of used heap memory in bytes."),
JvmMemNonHeapCommittedBytes: descHelper.NewDescWithHelp("jvm_mem_non_heap_committed_bytes", "Amount of non-heap memory in bytes that is committed for the Java virtual machine to use."),

JvmMemPoolPeakUsedInBytes: descHelper.NewDescWithHelpAndLabel(
JvmMemPoolPeakUsedInBytes: descHelper.NewDescWithHelpAndLabels(
"jvm_mem_pool_peak_used_bytes", "Peak used bytes of a given JVM memory pool.", "pool"),
JvmMemPoolUsedInBytes: descHelper.NewDescWithHelpAndLabel(
JvmMemPoolUsedInBytes: descHelper.NewDescWithHelpAndLabels(
"jvm_mem_pool_used_bytes", "Currently used bytes of a given JVM memory pool.", "pool"),
JvmMemPoolPeakMaxInBytes: descHelper.NewDescWithHelpAndLabel(
JvmMemPoolPeakMaxInBytes: descHelper.NewDescWithHelpAndLabels(
"jvm_mem_pool_peak_max_bytes", "Highest value of bytes that were used in a given JVM memory pool.", "pool"),
JvmMemPoolMaxInBytes: descHelper.NewDescWithHelpAndLabel(
JvmMemPoolMaxInBytes: descHelper.NewDescWithHelpAndLabels(
"jvm_mem_pool_max_bytes", "Maximum amount of bytes that can be used in a given JVM memory pool.", "pool"),
JvmMemPoolCommittedInBytes: descHelper.NewDescWithHelpAndLabel(
JvmMemPoolCommittedInBytes: descHelper.NewDescWithHelpAndLabels(
"jvm_mem_pool_committed_bytes", "Amount of bytes that are committed for the Java virtual machine to use in a given JVM memory pool.", "pool"),

JvmUptimeMillis: descHelper.NewDescWithHelp("jvm_uptime_millis", "Uptime of the JVM in milliseconds."),
Expand Down
23 changes: 23 additions & 0 deletions collectors/nodestats/nodestats_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ func TestCollectNotNil(t *testing.T) {
"logstash_stats_pipeline_reloads_successes",
"logstash_stats_pipeline_reloads_last_success_timestamp",
"logstash_stats_pipeline_reloads_last_failure_timestamp",
"logstash_stats_pipeline_plugin_events_in",
"logstash_stats_pipeline_plugin_events_out",
"logstash_stats_pipeline_plugin_events_duration",
"logstash_stats_pipeline_plugin_events_queue_push_duration",
"logstash_stats_process_cpu_percent",
"logstash_stats_process_cpu_total_millis",
"logstash_stats_process_cpu_load_average_1m",
Expand Down Expand Up @@ -147,3 +151,22 @@ func TestCollectError(t *testing.T) {
t.Error("Expected err not to be nil")
}
}

func TestTruncatePluginId(t *testing.T) {
testCases := []struct {
input string
output string
}{
{"plain_2c897236-b1fd-42e6-ab7a-f468-b6e6-e404", "b6e6e404"},
{"552b7810244be6259a4cc88fe34833088a23437c5ee9b4c788b2ec4e502c819f", "502c819f"},
{"pipeline_custom_filter_foobar", "pipeline_custom_filter_foobar"},
{"filter_0001", "filter_0001"},
}

for _, tc := range testCases {
got := TruncatePluginId(tc.input)
if got != tc.output {
t.Errorf("TruncatePluginId(%v) = %v; want %v", tc.input, got, tc.output)
}
}
}
85 changes: 73 additions & 12 deletions collectors/nodestats/pipeline_subcollector.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package nodestats
import (
"fmt"
"log"
"strings"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand All @@ -29,26 +30,37 @@ type PipelineSubcollector struct {
QueueEventsCount *prometheus.Desc
QueueEventsQueueSize *prometheus.Desc
QueueMaxQueueSizeInBytes *prometheus.Desc

PipelinePluginEventsIn *prometheus.Desc
PipelinePluginEventsOut *prometheus.Desc
PipelinePluginEventsDuration *prometheus.Desc
PipelinePluginEventsQueuePushDuration *prometheus.Desc
}

func NewPipelineSubcollector() *PipelineSubcollector {
descHelper := prometheus_helper.SimpleDescHelper{Namespace: namespace, Subsystem: fmt.Sprintf("%s_pipeline", subsystem)}
return &PipelineSubcollector{
EventsOut: descHelper.NewDescWithHelpAndLabel("events_out", "Number of events that have been processed by this pipeline.", "pipeline_id"),
EventsFiltered: descHelper.NewDescWithHelpAndLabel("events_filtered", "Number of events that have been filtered out by this pipeline.", "pipeline_id"),
EventsIn: descHelper.NewDescWithHelpAndLabel("events_in", "Number of events that have been inputted into this pipeline.", "pipeline_id"),
EventsDuration: descHelper.NewDescWithHelpAndLabel("events_duration", "Time needed to process event.", "pipeline_id"),
EventsQueuePushDuration: descHelper.NewDescWithHelpAndLabel("events_queue_push_duration", "Time needed to push event to queue.", "pipeline_id"),
EventsOut: descHelper.NewDescWithHelpAndLabels("events_out", "Number of events that have been processed by this pipeline.", "pipeline"),
EventsFiltered: descHelper.NewDescWithHelpAndLabels("events_filtered", "Number of events that have been filtered out by this pipeline.", "pipeline"),
EventsIn: descHelper.NewDescWithHelpAndLabels("events_in", "Number of events that have been inputted into this pipeline.", "pipeline"),
EventsDuration: descHelper.NewDescWithHelpAndLabels("events_duration", "Time needed to process event.", "pipeline"),
EventsQueuePushDuration: descHelper.NewDescWithHelpAndLabels("events_queue_push_duration", "Time needed to push event to queue.", "pipeline"),

ReloadsSuccesses: descHelper.NewDescWithHelpAndLabels("reloads_successes", "Number of successful pipeline reloads.", "pipeline"),
ReloadsFailures: descHelper.NewDescWithHelpAndLabels("reloads_failures", "Number of failed pipeline reloads.", "pipeline"),

ReloadsSuccesses: descHelper.NewDescWithHelpAndLabel("reloads_successes", "Number of successful pipeline reloads.", "pipeline_id"),
ReloadsFailures: descHelper.NewDescWithHelpAndLabel("reloads_failures", "Number of failed pipeline reloads.", "pipeline_id"),
ReloadsLastSuccessTimestamp: descHelper.NewDescWithHelpAndLabels("reloads_last_success_timestamp", "Timestamp of last successful pipeline reload.", "pipeline"),
ReloadsLastFailureTimestamp: descHelper.NewDescWithHelpAndLabels("reloads_last_failure_timestamp", "Timestamp of last failed pipeline reload.", "pipeline"),

ReloadsLastSuccessTimestamp: descHelper.NewDescWithHelpAndLabel("reloads_last_success_timestamp", "Timestamp of last successful pipeline reload.", "pipeline_id"),
ReloadsLastFailureTimestamp: descHelper.NewDescWithHelpAndLabel("reloads_last_failure_timestamp", "Timestamp of last failed pipeline reload.", "pipeline_id"),
QueueEventsCount: descHelper.NewDescWithHelpAndLabels("queue_events_count", "Number of events in the queue.", "pipeline"),
QueueEventsQueueSize: descHelper.NewDescWithHelpAndLabels("queue_events_queue_size", "Number of events that the queue can accommodate", "pipeline"),
QueueMaxQueueSizeInBytes: descHelper.NewDescWithHelpAndLabels("queue_max_size_in_bytes", "Maximum size of given queue in bytes.", "pipeline"),

PipelinePluginEventsIn: descHelper.NewDescWithHelpAndLabels("plugin_events_in", "Number of events received this pipeline.", "pipeline", "plugin_type", "plugin", "plugin_id"),
PipelinePluginEventsOut: descHelper.NewDescWithHelpAndLabels("plugin_events_out", "Number of events output by this pipeline.", "pipeline", "plugin_type", "plugin", "plugin_id"),
PipelinePluginEventsDuration: descHelper.NewDescWithHelpAndLabels("plugin_events_duration", "Time spent processing events in this plugin.", "pipeline", "plugin_type", "plugin", "plugin_id"),
PipelinePluginEventsQueuePushDuration: descHelper.NewDescWithHelpAndLabels("plugin_events_queue_push_duration", "Time spent pushing events into the input queue.", "pipeline", "plugin_type", "plugin", "plugin_id"),

QueueEventsCount: descHelper.NewDescWithHelpAndLabel("queue_events_count", "Number of events in the queue.", "pipeline_id"),
QueueEventsQueueSize: descHelper.NewDescWithHelpAndLabel("queue_events_queue_size", "Number of events that the queue can accommodate", "pipeline_id"),
QueueMaxQueueSizeInBytes: descHelper.NewDescWithHelpAndLabel("queue_max_size_in_bytes", "Maximum size of given queue in bytes.", "pipeline_id"),
}
}

Expand Down Expand Up @@ -76,6 +88,55 @@ func (collector *PipelineSubcollector) Collect(pipeStats *responses.SinglePipeli
ch <- prometheus.MustNewConstMetric(collector.QueueEventsQueueSize, prometheus.CounterValue, float64(pipeStats.Queue.QueueSizeInBytes), pipelineID)
ch <- prometheus.MustNewConstMetric(collector.QueueMaxQueueSizeInBytes, prometheus.CounterValue, float64(pipeStats.Queue.MaxQueueSizeInBytes), pipelineID)

// Pipeline plugins metrics
for _, plugin := range pipeStats.Plugins.Inputs {
pluginID := TruncatePluginId(plugin.ID)
pluginType := "input"
log.Printf("collecting pipeline plugin stats for pipeline %s :: plugin type:%s name:%s id:%s", pipelineID, pluginType, plugin.Name, pluginID)
ch <- prometheus.MustNewConstMetric(collector.PipelinePluginEventsOut, prometheus.CounterValue, float64(plugin.Events.Out), pipelineID, pluginType, plugin.Name, pluginID)
ch <- prometheus.MustNewConstMetric(collector.PipelinePluginEventsQueuePushDuration, prometheus.CounterValue, float64(plugin.Events.QueuePushDurationInMillis), pipelineID, pluginType, plugin.Name, pluginID)
}

for _, plugin := range pipeStats.Plugins.Codecs {
pluginID := TruncatePluginId(plugin.ID)
log.Printf("collecting pipeline plugin stats for pipeline %s :: plugin type:%s name:%s id:%s", pipelineID, "codec", plugin.Name, pluginID)
ch <- prometheus.MustNewConstMetric(collector.PipelinePluginEventsIn, prometheus.CounterValue, float64(plugin.Encode.WritesIn), pipelineID, "codec:encode", plugin.Name, pluginID)
ch <- prometheus.MustNewConstMetric(collector.PipelinePluginEventsIn, prometheus.CounterValue, float64(plugin.Decode.WritesIn), pipelineID, "codec:decode", plugin.Name, pluginID)
ch <- prometheus.MustNewConstMetric(collector.PipelinePluginEventsOut, prometheus.CounterValue, float64(plugin.Decode.Out), pipelineID, "codec:decode", plugin.Name, pluginID)
ch <- prometheus.MustNewConstMetric(collector.PipelinePluginEventsDuration, prometheus.CounterValue, float64(plugin.Encode.DurationInMillis), pipelineID, "codec:encode", plugin.Name, pluginID)
ch <- prometheus.MustNewConstMetric(collector.PipelinePluginEventsDuration, prometheus.CounterValue, float64(plugin.Decode.DurationInMillis), pipelineID, "codec:decode", plugin.Name, pluginID)
}

for _, plugin := range pipeStats.Plugins.Filters {
pluginID := TruncatePluginId(plugin.ID)
pluginType := "filter"
log.Printf("collecting pipeline plugin stats for pipeline %s :: plugin type:%s name:%s id:%s", pipelineID, pluginType, plugin.Name, pluginID)
ch <- prometheus.MustNewConstMetric(collector.PipelinePluginEventsIn, prometheus.CounterValue, float64(plugin.Events.Out), pipelineID, pluginType, plugin.Name, pluginID)
ch <- prometheus.MustNewConstMetric(collector.PipelinePluginEventsOut, prometheus.CounterValue, float64(plugin.Events.Out), pipelineID, pluginType, plugin.Name, pluginID)
ch <- prometheus.MustNewConstMetric(collector.PipelinePluginEventsDuration, prometheus.CounterValue, float64(plugin.Events.DurationInMillis), pipelineID, pluginType, plugin.Name, pluginID)
}

for _, plugin := range pipeStats.Plugins.Outputs {
pluginID := TruncatePluginId(plugin.ID)
pluginType := "output"
log.Printf("collecting pipeline plugin stats for pipeline %s :: plugin type:%s name:%s id:%s", pipelineID, pluginType, plugin.Name, pluginID)
ch <- prometheus.MustNewConstMetric(collector.PipelinePluginEventsIn, prometheus.CounterValue, float64(plugin.Events.In), pipelineID, pluginType, plugin.Name, pluginID)
ch <- prometheus.MustNewConstMetric(collector.PipelinePluginEventsOut, prometheus.CounterValue, float64(plugin.Events.Out), pipelineID, pluginType, plugin.Name, pluginID)
ch <- prometheus.MustNewConstMetric(collector.PipelinePluginEventsDuration, prometheus.CounterValue, float64(plugin.Events.DurationInMillis), pipelineID, pluginType, plugin.Name, pluginID)
}

collectingEnd := time.Now()
log.Printf("collected pipeline stats for pipeline %s in %s", pipelineID, collectingEnd.Sub(collectingStart))
}

// Plugins have non-unique names, so use both name and id as labels
// By default ids are a 36-char UUID, optionally prefixed the a plugin type, or a 64-char SHA256 hash
// If the id is set by the user, keep it. If it's a UUID, truncate it to the last 8 chars (1% chance of collision per 9291)
func TruncatePluginId(pluginID string) string {
// If the pluginId is < 32 chars, it's likely a user-defined id.
if len(pluginID) < 32 {
return pluginID
}
noDashes := strings.Replace(pluginID, "-", "", -1)
return noDashes[len(noDashes)-8:]
}
10 changes: 10 additions & 0 deletions fetcher/responses/__snapshots__/nodestats_response_test.snap
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,16 @@ responses.NodeStatsResponse{
Name: "json",
Events: struct { Out int "json:\"out\""; In int "json:\"in\""; DurationInMillis int "json:\"duration_in_millis\"" }{Out:2000, In:2000, DurationInMillis:716},
},
{
ID: "drop_b8ed8ea8c0ace91d6b617f6c8a5153141183c35a330de014182825dbceeade00",
Name: "drop",
Events: struct { Out int "json:\"out\""; In int "json:\"in\""; DurationInMillis int "json:\"duration_in_millis\"" }{Out:300, In:330, DurationInMillis:333},
},
{
ID: "drop_e2e0f559b7292f788693f9f318185d5c1d30127870ca8f0e608b11d9dc560079",
Name: "drop",
Events: struct { Out int "json:\"out\""; In int "json:\"in\""; DurationInMillis int "json:\"duration_in_millis\"" }{Out:800, In:880, DurationInMillis:888},
},
},
Outputs: {
{
Expand Down
18 changes: 18 additions & 0 deletions fixtures/node_stats.json
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,24 @@
"in": 2000,
"duration_in_millis": 716
}
},
{
"id": "drop_b8ed8ea8c0ace91d6b617f6c8a5153141183c35a330de014182825dbceeade00",
"name": "drop",
"events": {
"out": 300,
"in": 330,
"duration_in_millis": 333
}
},
{
"id": "drop_e2e0f559b7292f788693f9f318185d5c1d30127870ca8f0e608b11d9dc560079",
"name": "drop",
"events": {
"out": 800,
"in": 880,
"duration_in_millis": 888
}
}
],
"outputs": [
Expand Down
4 changes: 2 additions & 2 deletions prometheus_helper/prometheus_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ func (h *SimpleDescHelper) NewDescWithHelp(name string, help string) *prometheus

// NewDescWithLabel creates a new prometheus.Desc with the namespace and subsystem.
// Labels are used to differentiate between different sources of the same metric.
func (h *SimpleDescHelper) NewDescWithHelpAndLabel(name, help, label string) *prometheus.Desc {
return prometheus.NewDesc(prometheus.BuildFQName(h.Namespace, h.Subsystem, name), help, []string{label}, nil)
func (h *SimpleDescHelper) NewDescWithHelpAndLabels(name string, help string, labels ...string) *prometheus.Desc {
return prometheus.NewDesc(prometheus.BuildFQName(h.Namespace, h.Subsystem, name), help, labels, nil)
}

// ExtractFqName extracts the fqName from a prometheus.Desc string.
Expand Down
4 changes: 2 additions & 2 deletions prometheus_helper/prometheus_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestSimpleDescHelper(t *testing.T) {
})

t.Run("NewDescWithHelpAndLabel", func(t *testing.T) {
desc := helper.NewDescWithHelpAndLabel("metric", "help", "customLabel")
desc := helper.NewDescWithHelpAndLabels("metric", "help", "customLabel")
expectedDesc := "Desc{fqName: \"logstash_exporter_test_metric\", help: \"help\", constLabels: {}, variableLabels: [{customLabel <nil>}]}"
if desc.String() != expectedDesc {
t.Errorf("incorrect metric description, expected %s but got %s", expectedDesc, desc.String())
Expand All @@ -43,7 +43,7 @@ func TestExtractFqdnName(t *testing.T) {

descriptors := []*prometheus.Desc{
helper.NewDescWithHelp(metricSubname, "help"),
helper.NewDescWithHelpAndLabel(metricSubname, "help", "label"),
helper.NewDescWithHelpAndLabels(metricSubname, "help", "label"),
}

for _, desc := range descriptors {
Expand Down
4 changes: 4 additions & 0 deletions scripts/snapshots/metric_names.txt
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,7 @@ logstash_stats_jvm_mem_pool_max_bytes
logstash_stats_jvm_mem_pool_peak_max_bytes
logstash_stats_jvm_mem_pool_peak_used_bytes
logstash_stats_jvm_mem_pool_used_bytes
logstash_stats_pipeline_plugin_events_duration
logstash_stats_pipeline_plugin_events_in
logstash_stats_pipeline_plugin_events_out
logstash_stats_pipeline_plugin_events_queue_push_duration

0 comments on commit f94dbb0

Please sign in to comment.