Skip to content

Commit

Permalink
Add logstash_stats_pipeline_flow_worker_ metrics, fix other flow metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
kuskoman committed Oct 9, 2024
1 parent 771ee8b commit ac2b46c
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 28 deletions.
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ The application can be configured using the following environment variables, whi
| `LOG_LEVEL` | [Log level](https://pkg.go.dev/golang.org/x/exp/slog#Level) (defaults to "info" if not set) | `""` (empty string) |
| `HTTP_TIMEOUT`| Timeout for HTTP requests to Logstash API in [Go duration format](https://golang.org/pkg/time/#ParseDuration) | `2s` |


All configuration variables can be checked in the [config directory](./config/).

## Building
Expand Down
26 changes: 16 additions & 10 deletions collectors/nodestats/pipeline_subcollector.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ type PipelineSubcollector struct {
FlowQueueBackpressureLifetime *prometheus.Desc
FlowWorkerConcurrencyCurrent *prometheus.Desc
FlowWorkerConcurrencyLifetime *prometheus.Desc
FlowWorkerUtilizationCurrent *prometheus.Desc
FlowWorkerUtilizationLifetime *prometheus.Desc

DeadLetterQueueMaxSizeInBytes *prometheus.Desc
DeadLetterQueueSizeInBytes *prometheus.Desc
Expand Down Expand Up @@ -102,6 +104,8 @@ func NewPipelineSubcollector() *PipelineSubcollector {
FlowQueueBackpressureLifetime: descHelper.NewDescWithHelpAndLabels("flow_queue_backpressure_lifetime", "Lifetime number of events in the backpressure queue.", "pipeline"),
FlowWorkerConcurrencyCurrent: descHelper.NewDescWithHelpAndLabels("flow_worker_concurrency_current", "Current number of workers.", "pipeline"),
FlowWorkerConcurrencyLifetime: descHelper.NewDescWithHelpAndLabels("flow_worker_concurrency_lifetime", "Lifetime number of workers.", "pipeline"),
FlowWorkerUtilizationCurrent: descHelper.NewDescWithHelpAndLabels("flow_worker_utilization_current", "Current worker utilization.", "pipeline"),
FlowWorkerUtilizationLifetime: descHelper.NewDescWithHelpAndLabels("flow_worker_utilization_lifetime", "Lifetime worker utilization.", "pipeline"),

DeadLetterQueueMaxSizeInBytes: descHelper.NewDescWithHelpAndLabels("dead_letter_queue_max_size_in_bytes", "Maximum size of the dead letter queue in bytes.", "pipeline"),
DeadLetterQueueSizeInBytes: descHelper.NewDescWithHelpAndLabels("dead_letter_queue_size_in_bytes", "Current size of the dead letter queue in bytes.", "pipeline"),
Expand Down Expand Up @@ -137,16 +141,18 @@ func (collector *PipelineSubcollector) Collect(pipeStats *responses.SinglePipeli
ch <- prometheus.MustNewConstMetric(collector.QueueMaxQueueSizeInBytes, prometheus.CounterValue, float64(pipeStats.Queue.MaxQueueSizeInBytes), pipelineID)

flowStats := pipeStats.Flow
ch <- prometheus.MustNewConstMetric(collector.FlowInputCurrent, prometheus.GaugeValue, float64(flowStats.InputThroughput.Current), pipelineID)
ch <- prometheus.MustNewConstMetric(collector.FlowInputLifetime, prometheus.CounterValue, float64(flowStats.InputThroughput.Lifetime), pipelineID)
ch <- prometheus.MustNewConstMetric(collector.FlowFilterCurrent, prometheus.GaugeValue, float64(flowStats.FilterThroughput.Current), pipelineID)
ch <- prometheus.MustNewConstMetric(collector.FlowFilterLifetime, prometheus.CounterValue, float64(flowStats.FilterThroughput.Lifetime), pipelineID)
ch <- prometheus.MustNewConstMetric(collector.FlowOutputCurrent, prometheus.GaugeValue, float64(flowStats.OutputThroughput.Current), pipelineID)
ch <- prometheus.MustNewConstMetric(collector.FlowOutputLifetime, prometheus.CounterValue, float64(flowStats.OutputThroughput.Lifetime), pipelineID)
ch <- prometheus.MustNewConstMetric(collector.FlowQueueBackpressureCurrent, prometheus.GaugeValue, float64(flowStats.QueueBackpressure.Current), pipelineID)
ch <- prometheus.MustNewConstMetric(collector.FlowQueueBackpressureLifetime, prometheus.CounterValue, float64(flowStats.QueueBackpressure.Lifetime), pipelineID)
ch <- prometheus.MustNewConstMetric(collector.FlowWorkerConcurrencyCurrent, prometheus.GaugeValue, float64(flowStats.WorkerConcurrency.Current), pipelineID)
ch <- prometheus.MustNewConstMetric(collector.FlowWorkerConcurrencyLifetime, prometheus.CounterValue, float64(flowStats.WorkerConcurrency.Lifetime), pipelineID)
ch <- prometheus.MustNewConstMetric(collector.FlowInputCurrent, prometheus.GaugeValue, flowStats.InputThroughput.Current, pipelineID)
ch <- prometheus.MustNewConstMetric(collector.FlowInputLifetime, prometheus.GaugeValue, flowStats.InputThroughput.Lifetime, pipelineID)
ch <- prometheus.MustNewConstMetric(collector.FlowFilterCurrent, prometheus.GaugeValue, flowStats.FilterThroughput.Current, pipelineID)
ch <- prometheus.MustNewConstMetric(collector.FlowFilterLifetime, prometheus.GaugeValue, flowStats.FilterThroughput.Lifetime, pipelineID)
ch <- prometheus.MustNewConstMetric(collector.FlowOutputCurrent, prometheus.GaugeValue, flowStats.OutputThroughput.Current, pipelineID)
ch <- prometheus.MustNewConstMetric(collector.FlowOutputLifetime, prometheus.GaugeValue, flowStats.OutputThroughput.Lifetime, pipelineID)
ch <- prometheus.MustNewConstMetric(collector.FlowQueueBackpressureCurrent, prometheus.GaugeValue, flowStats.QueueBackpressure.Current, pipelineID)
ch <- prometheus.MustNewConstMetric(collector.FlowQueueBackpressureLifetime, prometheus.GaugeValue, flowStats.QueueBackpressure.Lifetime, pipelineID)
ch <- prometheus.MustNewConstMetric(collector.FlowWorkerConcurrencyCurrent, prometheus.GaugeValue, flowStats.WorkerConcurrency.Current, pipelineID)
ch <- prometheus.MustNewConstMetric(collector.FlowWorkerConcurrencyLifetime, prometheus.GaugeValue, flowStats.WorkerConcurrency.Lifetime, pipelineID)
ch <- prometheus.MustNewConstMetric(collector.FlowWorkerUtilizationCurrent, prometheus.GaugeValue, flowStats.WorkerUtilization.Current, pipelineID)
ch <- prometheus.MustNewConstMetric(collector.FlowWorkerUtilizationLifetime, prometheus.GaugeValue, flowStats.WorkerUtilization.Lifetime, pipelineID)

deadLetterQueueStats := pipeStats.DeadLetterQueue
ch <- prometheus.MustNewConstMetric(collector.DeadLetterQueueMaxSizeInBytes, prometheus.GaugeValue, float64(deadLetterQueueStats.MaxQueueSizeInBytes), pipelineID)
Expand Down
21 changes: 18 additions & 3 deletions fetcher/responses/__snapshots__/nodestats_response_test.snap
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ responses.NodeStatsResponse{
OutputThroughput: struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" }{Current:0, Lifetime:24.63},
QueueBackpressure: struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" }{Current:1, Lifetime:0.9743},
WorkerConcurrency: struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" }{Current:10, Lifetime:9.752},
WorkerUtilization: struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" }{},
},
Reloads: responses.ReloadResponse{},
Os: responses.OsResponse{},
Expand All @@ -60,8 +61,15 @@ responses.NodeStatsResponse{
".monitoring-logstash": {
Monitoring: responses.PipelineLogstashMonitoringResponse{},
Events: struct { Out int "json:\"out\""; Filtered int "json:\"filtered\""; In int "json:\"in\""; DurationInMillis int "json:\"duration_in_millis\""; QueuePushDurationInMillis int "json:\"queue_push_duration_in_millis\"" }{},
Flow: responses.FlowResponse{},
Plugins: struct { Inputs []struct { ID string "json:\"id\""; Name string "json:\"name\""; Events struct { Out int "json:\"out\""; QueuePushDurationInMillis int "json:\"queue_push_duration_in_millis\"" } "json:\"events\"" } "json:\"inputs\""; Codecs []struct { ID string "json:\"id\""; Name string "json:\"name\""; Decode struct { Out int "json:\"out\""; WritesIn int "json:\"writes_in\""; DurationInMillis int "json:\"duration_in_millis\"" } "json:\"decode\""; Encode struct { WritesIn int "json:\"writes_in\""; DurationInMillis int "json:\"duration_in_millis\"" } "json:\"encode\"" } "json:\"codecs\""; Filters []struct { ID string "json:\"id\""; Name string "json:\"name\""; Events struct { Out int "json:\"out\""; In int "json:\"in\""; DurationInMillis int "json:\"duration_in_millis\"" } "json:\"events\"" } "json:\"filters\""; Outputs []struct { ID string "json:\"id\""; Name string "json:\"name\""; Events struct { Out int "json:\"out\""; In int "json:\"in\""; DurationInMillis int "json:\"duration_in_millis\"" } "json:\"events\""; Documents struct { Successes int "json:\"successes\""; NonRetryableFailures int "json:\"non_retryable_failures\"" } "json:\"documents\""; BulkRequests struct { WithErrors int "json:\"with_errors\""; Responses map[string]int "json:\"responses\"" } "json:\"bulk_requests\"" } "json:\"outputs\"" }{
Flow: responses.FlowResponse{
InputThroughput: struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" }{Current:1, Lifetime:2},
FilterThroughput: struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" }{Current:3, Lifetime:4},
OutputThroughput: struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" }{Current:9, Lifetime:10},
QueueBackpressure: struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" }{Current:7, Lifetime:8},
WorkerConcurrency: struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" }{Current:5, Lifetime:6},
WorkerUtilization: struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" }{Current:11, Lifetime:12},
},
Plugins: struct { Inputs []struct { ID string "json:\"id\""; Name string "json:\"name\""; Events struct { Out int "json:\"out\""; QueuePushDurationInMillis int "json:\"queue_push_duration_in_millis\"" } "json:\"events\"" } "json:\"inputs\""; Codecs []struct { ID string "json:\"id\""; Name string "json:\"name\""; Decode struct { Out int "json:\"out\""; WritesIn int "json:\"writes_in\""; DurationInMillis int "json:\"duration_in_millis\"" } "json:\"decode\""; Encode struct { WritesIn int "json:\"writes_in\""; DurationInMillis int "json:\"duration_in_millis\"" } "json:\"encode\"" } "json:\"codecs\""; Filters []struct { ID string "json:\"id\""; Name string "json:\"name\""; Events struct { Out int "json:\"out\""; In int "json:\"in\""; DurationInMillis int "json:\"duration_in_millis\"" } "json:\"events\"" } "json:\"filters\""; Outputs []struct { ID string "json:\"id\""; Name string "json:\"name\""; Events struct { Out int "json:\"out\""; In int "json:\"in\""; DurationInMillis int "json:\"duration_in_millis\"" } "json:\"events\""; Documents struct { Successes int "json:\"successes\""; NonRetryableFailures int "json:\"non_retryable_failures\"" } "json:\"documents\""; BulkRequests struct { WithErrors int "json:\"with_errors\""; Responses map[string]int "json:\"responses\"" } "json:\"bulk_requests\""; Flow struct { WorkerUtilization struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" } "json:\"worker_utilization\"" } "json:\"flow\"" } "json:\"outputs\"" }{
Inputs: {
{
ID: "9a9bed30135e19c8047fe6aa0588b70b15280fb9161fea8ed8e7368e1fb1e6d3",
Expand All @@ -80,6 +88,9 @@ responses.NodeStatsResponse{
Events: struct { Out int "json:\"out\""; In int "json:\"in\""; DurationInMillis int "json:\"duration_in_millis\"" }{},
Documents: struct { Successes int "json:\"successes\""; NonRetryableFailures int "json:\"non_retryable_failures\"" }{},
BulkRequests: struct { WithErrors int "json:\"with_errors\""; Responses map[string]int "json:\"responses\"" }{},
Flow: struct { WorkerUtilization struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" } "json:\"worker_utilization\"" }{
WorkerUtilization: struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" }{Current:0.5, Lifetime:1.5},
},
},
},
},
Expand Down Expand Up @@ -107,8 +118,9 @@ responses.NodeStatsResponse{
OutputThroughput: struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" }{Current:0, Lifetime:24.95},
QueueBackpressure: struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" }{Current:1, Lifetime:0.9872},
WorkerConcurrency: struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" }{Current:10, Lifetime:9.882},
WorkerUtilization: struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" }{Current:100, Lifetime:98.82},
},
Plugins: struct { Inputs []struct { ID string "json:\"id\""; Name string "json:\"name\""; Events struct { Out int "json:\"out\""; QueuePushDurationInMillis int "json:\"queue_push_duration_in_millis\"" } "json:\"events\"" } "json:\"inputs\""; Codecs []struct { ID string "json:\"id\""; Name string "json:\"name\""; Decode struct { Out int "json:\"out\""; WritesIn int "json:\"writes_in\""; DurationInMillis int "json:\"duration_in_millis\"" } "json:\"decode\""; Encode struct { WritesIn int "json:\"writes_in\""; DurationInMillis int "json:\"duration_in_millis\"" } "json:\"encode\"" } "json:\"codecs\""; Filters []struct { ID string "json:\"id\""; Name string "json:\"name\""; Events struct { Out int "json:\"out\""; In int "json:\"in\""; DurationInMillis int "json:\"duration_in_millis\"" } "json:\"events\"" } "json:\"filters\""; Outputs []struct { ID string "json:\"id\""; Name string "json:\"name\""; Events struct { Out int "json:\"out\""; In int "json:\"in\""; DurationInMillis int "json:\"duration_in_millis\"" } "json:\"events\""; Documents struct { Successes int "json:\"successes\""; NonRetryableFailures int "json:\"non_retryable_failures\"" } "json:\"documents\""; BulkRequests struct { WithErrors int "json:\"with_errors\""; Responses map[string]int "json:\"responses\"" } "json:\"bulk_requests\"" } "json:\"outputs\"" }{
Plugins: struct { Inputs []struct { ID string "json:\"id\""; Name string "json:\"name\""; Events struct { Out int "json:\"out\""; QueuePushDurationInMillis int "json:\"queue_push_duration_in_millis\"" } "json:\"events\"" } "json:\"inputs\""; Codecs []struct { ID string "json:\"id\""; Name string "json:\"name\""; Decode struct { Out int "json:\"out\""; WritesIn int "json:\"writes_in\""; DurationInMillis int "json:\"duration_in_millis\"" } "json:\"decode\""; Encode struct { WritesIn int "json:\"writes_in\""; DurationInMillis int "json:\"duration_in_millis\"" } "json:\"encode\"" } "json:\"codecs\""; Filters []struct { ID string "json:\"id\""; Name string "json:\"name\""; Events struct { Out int "json:\"out\""; In int "json:\"in\""; DurationInMillis int "json:\"duration_in_millis\"" } "json:\"events\"" } "json:\"filters\""; Outputs []struct { ID string "json:\"id\""; Name string "json:\"name\""; Events struct { Out int "json:\"out\""; In int "json:\"in\""; DurationInMillis int "json:\"duration_in_millis\"" } "json:\"events\""; Documents struct { Successes int "json:\"successes\""; NonRetryableFailures int "json:\"non_retryable_failures\"" } "json:\"documents\""; BulkRequests struct { WithErrors int "json:\"with_errors\""; Responses map[string]int "json:\"responses\"" } "json:\"bulk_requests\""; Flow struct { WorkerUtilization struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" } "json:\"worker_utilization\"" } "json:\"flow\"" } "json:\"outputs\"" }{
Inputs: {
{
ID: "5ee0ea3d45c32bab3b41963bd900e758ba6e193a11079649302574c706fd5e2f",
Expand Down Expand Up @@ -183,6 +195,9 @@ responses.NodeStatsResponse{
WithErrors: 0,
Responses: {"200":10},
},
Flow: struct { WorkerUtilization struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" } "json:\"worker_utilization\"" }{
WorkerUtilization: struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" }{Current:0, Lifetime:0.9756},
},
},
},
},
Expand Down
10 changes: 10 additions & 0 deletions fetcher/responses/nodestats_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ type FlowResponse struct {
Current float64 `json:"current"`
Lifetime float64 `json:"lifetime"`
} `json:"worker_concurrency"`
WorkerUtilization struct {
Current float64 `json:"current"`
Lifetime float64 `json:"lifetime"`
} `json:"worker_utilization"`
}

type SinglePipelineResponse struct {
Expand Down Expand Up @@ -165,6 +169,12 @@ type SinglePipelineResponse struct {
WithErrors int `json:"with_errors"`
Responses map[string]int `json:"responses"`
} `json:"bulk_requests"`
Flow struct {
WorkerUtilization struct {
Current float64 `json:"current"`
Lifetime float64 `json:"lifetime"`
} `json:"worker_utilization"`
} `json:"flow"`
} `json:"outputs"`
} `json:"plugins"`
Reloads PipelineReloadResponse `json:"reloads"`
Expand Down
28 changes: 14 additions & 14 deletions fixtures/node_stats.json
Original file line number Diff line number Diff line change
Expand Up @@ -119,28 +119,28 @@
},
"flow": {
"input_throughput": {
"current": 0.0,
"lifetime": 0.0
"current": 1.0,
"lifetime": 2.0
},
"filter_throughput": {
"current": 0.0,
"lifetime": 0.0
"current": 3.0,
"lifetime": 4.0
},
"worker_concurrency": {
"current": 0.0,
"lifetime": 0.0
"current": 5.0,
"lifetime": 6.0
},
"queue_backpressure": {
"current": 0.0,
"lifetime": 0.0
"current": 7.0,
"lifetime": 8.0
},
"output_throughput": {
"current": 0.0,
"lifetime": 0.0
"current": 9.0,
"lifetime": 10.0
},
"worker_utilization": {
"current": 0.0,
"lifetime": 0.0
"current": 11.0,
"lifetime": 12.0
}
},
"plugins": {
Expand Down Expand Up @@ -170,8 +170,8 @@
"flow": {
"worker_millis_per_event": {},
"worker_utilization": {
"current": 0.0,
"lifetime": 0.0
"current": 0.5,
"lifetime": 1.5
}
}
}
Expand Down

0 comments on commit ac2b46c

Please sign in to comment.