Skip to content

Commit

Permalink
Merge pull request kuskoman#138 from kuskoman/flow-properties
Browse files Browse the repository at this point in the history
Parse flow and event metrics, collect event metrics
  • Loading branch information
kuskoman authored Jun 17, 2023
2 parents 31018a4 + 17ea0ae commit b197542
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 27 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,11 @@ Table of exported metrics:
| logstash_info_pipeline_workers | counter | Number of worker threads that will process pipeline events. |
| logstash_info_status | counter | A metric with a constant '1' value labeled by status. |
| logstash_info_up | gauge | A metric that returns 1 if the node is up, 0 otherwise. |
| logstash_stats_events_duration_millis | gauge | Duration of events processing in milliseconds. |
| logstash_stats_events_filtered | gauge | Number of events filtered out. |
| logstash_stats_events_in | gauge | Number of events received. |
| logstash_stats_events_out | gauge | Number of events out. |
| logstash_stats_events_queue_push_duration_millis | gauge | Duration of events push to queue in milliseconds. |
| logstash_stats_jvm_mem_heap_committed_bytes | gauge | Amount of heap memory in bytes that is committed for the Java virtual machine to use. |
| logstash_stats_jvm_mem_heap_max_bytes | gauge | Maximum amount of heap memory in bytes that can be used for memory management. |
| logstash_stats_jvm_mem_heap_used_bytes | gauge | Amount of used heap memory in bytes. |
Expand Down
18 changes: 18 additions & 0 deletions collectors/nodestats/nodestats_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ type NodestatsCollector struct {
ReloadFailures *prometheus.Desc

QueueEventsCount *prometheus.Desc

EventsIn *prometheus.Desc
EventsFiltered *prometheus.Desc
EventsOut *prometheus.Desc
EventsDurationInMillis *prometheus.Desc
EventsQueuePushDurationInMillis *prometheus.Desc
}

func NewNodestatsCollector(client logstashclient.Client) *NodestatsCollector {
Expand Down Expand Up @@ -97,6 +103,12 @@ func NewNodestatsCollector(client logstashclient.Client) *NodestatsCollector {
ReloadFailures: descHelper.NewDescWithHelp("reload_failures", "Number of failed reloads."),

QueueEventsCount: descHelper.NewDescWithHelp("queue_events_count", "Number of events in the queue."),

EventsIn: descHelper.NewDescWithHelp("events_in", "Number of events received."),
EventsFiltered: descHelper.NewDescWithHelp("events_filtered", "Number of events filtered out."),
EventsOut: descHelper.NewDescWithHelp("events_out", "Number of events out."),
EventsDurationInMillis: descHelper.NewDescWithHelp("events_duration_millis", "Duration of events processing in milliseconds."),
EventsQueuePushDurationInMillis: descHelper.NewDescWithHelp("events_queue_push_duration_millis", "Duration of events push to queue in milliseconds."),
}
}

Expand Down Expand Up @@ -151,6 +163,12 @@ func (c *NodestatsCollector) Collect(ctx context.Context, ch chan<- prometheus.M

ch <- prometheus.MustNewConstMetric(c.QueueEventsCount, prometheus.GaugeValue, float64(nodeStats.Queue.EventsCount))

ch <- prometheus.MustNewConstMetric(c.EventsIn, prometheus.GaugeValue, float64(nodeStats.Events.In))
ch <- prometheus.MustNewConstMetric(c.EventsFiltered, prometheus.GaugeValue, float64(nodeStats.Events.Filtered))
ch <- prometheus.MustNewConstMetric(c.EventsOut, prometheus.GaugeValue, float64(nodeStats.Events.Out))
ch <- prometheus.MustNewConstMetric(c.EventsDurationInMillis, prometheus.GaugeValue, float64(nodeStats.Events.DurationInMillis))
ch <- prometheus.MustNewConstMetric(c.EventsQueuePushDurationInMillis, prometheus.GaugeValue, float64(nodeStats.Events.QueuePushDurationInMillis))

for pipelineId, pipelineStats := range nodeStats.Pipelines {
c.pipelineSubcollector.Collect(&pipelineStats, pipelineId, ch)
}
Expand Down
18 changes: 13 additions & 5 deletions fetcher/responses/__snapshots__/nodestats_response_test.snap
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@ responses.NodeStatsResponse{
LoadAverage: struct { OneM float64 "json:\"1m\""; FiveM float64 "json:\"5m\""; FifteenM float64 "json:\"15m\"" }{OneM:3.79, FiveM:1.29, FifteenM:0.46},
},
},
Events: responses.EventsResponse{In:4001, Filtered:10, Out:2, DurationInMillis:5, QueuePushDurationInMillis:7},
Flow: responses.FlowResponse{
InputThroughput: struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" }{Current:1, Lifetime:117.4},
FilterThroughput: struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" }{Current:2.1, Lifetime:3.2},
OutputThroughput: struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" }{Current:4.3, Lifetime:5.4},
QueueBackpressure: struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" }{Current:6.5, Lifetime:7.6},
WorkerConcurrency: struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" }{Current:8.7, Lifetime:9.8},
},
Reloads: responses.ReloadResponse{},
Os: responses.OsResponse{
Cgroup: struct { Cpu struct { CfsPeriodMicros int64 "json:\"cfs_period_micros\""; CfsQuotaMicros int64 "json:\"cfs_quota_micros\""; Stat struct { TimeThrottledNanos int64 "json:\"time_throttled_nanos\""; NumberOfTimesThrottled int64 "json:\"number_of_times_throttled\""; NumberOfElapsedPeriods int64 "json:\"number_of_elapsed_periods\"" } "json:\"stat\""; ControlGroup string "json:\"control_group\"" } "json:\"cpu\""; Cpuacct struct { UsageNanos int64 "json:\"usage_nanos\""; ControlGroup string "json:\"control_group\"" } "json:\"cpuacct\"" }{
Expand Down Expand Up @@ -83,11 +91,11 @@ responses.NodeStatsResponse{
Monitoring: responses.PipelineLogstashMonitoringResponse{},
Events: struct { Out int "json:\"out\""; Filtered int "json:\"filtered\""; In int "json:\"in\""; DurationInMillis int "json:\"duration_in_millis\""; QueuePushDurationInMillis int "json:\"queue_push_duration_in_millis\"" }{Out:0, Filtered:0, In:4001, DurationInMillis:0, QueuePushDurationInMillis:0},
Flow: responses.FlowResponse{
InputThroughput: struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" }{Current:0, Lifetime:124},
FilterThroughput: struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" }{},
OutputThroughput: struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" }{},
QueueBackpressure: struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" }{},
WorkerConcurrency: struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" }{},
InputThroughput: struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" }{Current:6.7, Lifetime:124},
FilterThroughput: struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" }{Current:7.8, Lifetime:8.9},
OutputThroughput: struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" }{Current:2.3, Lifetime:3.4},
QueueBackpressure: struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" }{Current:1.1, Lifetime:2.2},
WorkerConcurrency: struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" }{Current:4.5, Lifetime:5.6},
},
Plugins: struct { Inputs []struct { ID string "json:\"id\""; Name string "json:\"name\""; Events struct { Out int "json:\"out\""; QueuePushDurationInMillis int "json:\"queue_push_duration_in_millis\"" } "json:\"events\"" } "json:\"inputs\""; Codecs []struct { ID string "json:\"id\""; Name string "json:\"name\""; Decode struct { Out int "json:\"out\""; WritesIn int "json:\"writes_in\""; DurationInMillis int "json:\"duration_in_millis\"" } "json:\"decode\""; Encode struct { WritesIn int "json:\"writes_in\""; DurationInMillis int "json:\"duration_in_millis\"" } "json:\"encode\"" } "json:\"codecs\""; Filters []struct { ID string "json:\"id\""; Name string "json:\"name\""; Events struct { Out int "json:\"out\""; In int "json:\"in\""; DurationInMillis int "json:\"duration_in_millis\"" } "json:\"events\"" } "json:\"filters\""; Outputs []struct { ID string "json:\"id\""; Name string "json:\"name\""; Events struct { Out int "json:\"out\""; In int "json:\"in\""; DurationInMillis int "json:\"duration_in_millis\"" } "json:\"events\""; Documents struct { Successes int "json:\"successes\""; NonRetryableFailures int "json:\"non_retryable_failures\"" } "json:\"documents\""; BulkRequests struct { WithErrors int "json:\"with_errors\""; Responses map[string]int "json:\"responses\"" } "json:\"bulk_requests\"" } "json:\"outputs\"" }{
Inputs: {
Expand Down
2 changes: 2 additions & 0 deletions fetcher/responses/nodestats_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,8 @@ type NodeStatsResponse struct {
Pipeline PipelineResponse `json:"pipeline"`
Jvm JvmResponse `json:"jvm"`
Process ProcessResponse `json:"process"`
Events EventsResponse `json:"events"`
Flow FlowResponse `json:"flow"`
Reloads ReloadResponse `json:"reloads"`
Os OsResponse `json:"os"`
Queue QueueResponse `json:"queue"`
Expand Down
44 changes: 22 additions & 22 deletions fixtures/node_stats.json
Original file line number Diff line number Diff line change
Expand Up @@ -81,31 +81,31 @@
},
"events": {
"in": 4001,
"filtered": 0,
"out": 0,
"duration_in_millis": 0,
"queue_push_duration_in_millis": 0
"filtered": 10,
"out": 2,
"duration_in_millis": 5,
"queue_push_duration_in_millis": 7
},
"flow": {
"input_throughput": {
"current": 0.0,
"current": 1.0,
"lifetime": 117.4
},
"filter_throughput": {
"current": 0.0,
"lifetime": 0.0
"current": 2.1,
"lifetime": 3.2
},
"output_throughput": {
"current": 0.0,
"lifetime": 0.0
"current": 4.3,
"lifetime": 5.4
},
"queue_backpressure": {
"current": 0.0,
"lifetime": 0.0
"current": 6.5,
"lifetime": 7.6
},
"worker_concurrency": {
"current": 0.0,
"lifetime": 0.0
"current": 8.7,
"lifetime": 9.8
}
},
"pipelines": {
Expand All @@ -119,24 +119,24 @@
},
"flow": {
"output_throughput": {
"current": 0.0,
"lifetime": 0.0
"current": 2.3,
"lifetime": 3.4
},
"worker_concurrency": {
"current": 0.0,
"lifetime": 0.0
"current": 4.5,
"lifetime": 5.6
},
"input_throughput": {
"current": 0.0,
"current": 6.7,
"lifetime": 124.0
},
"filter_throughput": {
"current": 0.0,
"lifetime": 0.0
"current": 7.8,
"lifetime": 8.9
},
"queue_backpressure": {
"current": 0.0,
"lifetime": 0.0
"current": 1.1,
"lifetime": 2.2
}
},
"plugins": {
Expand Down
5 changes: 5 additions & 0 deletions scripts/snapshots/metric_names.txt
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,8 @@ logstash_stats_pipeline_dead_letter_queue_dropped_events
logstash_stats_pipeline_dead_letter_queue_expired_events
logstash_stats_pipeline_dead_letter_queue_max_size_in_bytes
logstash_stats_pipeline_dead_letter_queue_size_in_bytes
logstash_stats_events_duration_millis
logstash_stats_events_filtered
logstash_stats_events_in
logstash_stats_events_out
logstash_stats_events_queue_push_duration_millis

0 comments on commit b197542

Please sign in to comment.