diff --git a/README.md b/README.md index ed303cc7..2fa3c750 100644 --- a/README.md +++ b/README.md @@ -97,7 +97,6 @@ The application can be configured using the following environment variables, whi | `LOG_LEVEL` | [Log level](https://pkg.go.dev/golang.org/x/exp/slog#Level) (defaults to "info" if not set) | `""` (empty string) | | `HTTP_TIMEOUT`| Timeout for HTTP requests to Logstash API in [Go duration format](https://golang.org/pkg/time/#ParseDuration) | `2s` | - All configuration variables can be checked in the [config directory](./config/). ## Building diff --git a/collectors/nodestats/pipeline_subcollector.go b/collectors/nodestats/pipeline_subcollector.go index bc5abef3..8ca7ce78 100644 --- a/collectors/nodestats/pipeline_subcollector.go +++ b/collectors/nodestats/pipeline_subcollector.go @@ -55,6 +55,8 @@ type PipelineSubcollector struct { FlowQueueBackpressureLifetime *prometheus.Desc FlowWorkerConcurrencyCurrent *prometheus.Desc FlowWorkerConcurrencyLifetime *prometheus.Desc + FlowWorkerUtilizationCurrent *prometheus.Desc + FlowWorkerUtilizationLifetime *prometheus.Desc DeadLetterQueueMaxSizeInBytes *prometheus.Desc DeadLetterQueueSizeInBytes *prometheus.Desc @@ -102,6 +104,8 @@ func NewPipelineSubcollector() *PipelineSubcollector { FlowQueueBackpressureLifetime: descHelper.NewDescWithHelpAndLabels("flow_queue_backpressure_lifetime", "Lifetime number of events in the backpressure queue.", "pipeline"), FlowWorkerConcurrencyCurrent: descHelper.NewDescWithHelpAndLabels("flow_worker_concurrency_current", "Current number of workers.", "pipeline"), FlowWorkerConcurrencyLifetime: descHelper.NewDescWithHelpAndLabels("flow_worker_concurrency_lifetime", "Lifetime number of workers.", "pipeline"), + FlowWorkerUtilizationCurrent: descHelper.NewDescWithHelpAndLabels("flow_worker_utilization_current", "Current worker utilization.", "pipeline"), + FlowWorkerUtilizationLifetime: descHelper.NewDescWithHelpAndLabels("flow_worker_utilization_lifetime", "Lifetime worker utilization.", "pipeline"), DeadLetterQueueMaxSizeInBytes: descHelper.NewDescWithHelpAndLabels("dead_letter_queue_max_size_in_bytes", "Maximum size of the dead letter queue in bytes.", "pipeline"), DeadLetterQueueSizeInBytes: descHelper.NewDescWithHelpAndLabels("dead_letter_queue_size_in_bytes", "Current size of the dead letter queue in bytes.", "pipeline"), @@ -137,16 +141,18 @@ func (collector *PipelineSubcollector) Collect(pipeStats *responses.SinglePipeli ch <- prometheus.MustNewConstMetric(collector.QueueMaxQueueSizeInBytes, prometheus.CounterValue, float64(pipeStats.Queue.MaxQueueSizeInBytes), pipelineID) flowStats := pipeStats.Flow - ch <- prometheus.MustNewConstMetric(collector.FlowInputCurrent, prometheus.GaugeValue, float64(flowStats.InputThroughput.Current), pipelineID) - ch <- prometheus.MustNewConstMetric(collector.FlowInputLifetime, prometheus.CounterValue, float64(flowStats.InputThroughput.Lifetime), pipelineID) - ch <- prometheus.MustNewConstMetric(collector.FlowFilterCurrent, prometheus.GaugeValue, float64(flowStats.FilterThroughput.Current), pipelineID) - ch <- prometheus.MustNewConstMetric(collector.FlowFilterLifetime, prometheus.CounterValue, float64(flowStats.FilterThroughput.Lifetime), pipelineID) - ch <- prometheus.MustNewConstMetric(collector.FlowOutputCurrent, prometheus.GaugeValue, float64(flowStats.OutputThroughput.Current), pipelineID) - ch <- prometheus.MustNewConstMetric(collector.FlowOutputLifetime, prometheus.CounterValue, float64(flowStats.OutputThroughput.Lifetime), pipelineID) - ch <- prometheus.MustNewConstMetric(collector.FlowQueueBackpressureCurrent, prometheus.GaugeValue, float64(flowStats.QueueBackpressure.Current), pipelineID) - ch <- prometheus.MustNewConstMetric(collector.FlowQueueBackpressureLifetime, prometheus.CounterValue, float64(flowStats.QueueBackpressure.Lifetime), pipelineID) - ch <- prometheus.MustNewConstMetric(collector.FlowWorkerConcurrencyCurrent, prometheus.GaugeValue, float64(flowStats.WorkerConcurrency.Current), pipelineID) - ch <- prometheus.MustNewConstMetric(collector.FlowWorkerConcurrencyLifetime, prometheus.CounterValue, float64(flowStats.WorkerConcurrency.Lifetime), pipelineID) + ch <- prometheus.MustNewConstMetric(collector.FlowInputCurrent, prometheus.GaugeValue, flowStats.InputThroughput.Current, pipelineID) + ch <- prometheus.MustNewConstMetric(collector.FlowInputLifetime, prometheus.GaugeValue, flowStats.InputThroughput.Lifetime, pipelineID) + ch <- prometheus.MustNewConstMetric(collector.FlowFilterCurrent, prometheus.GaugeValue, flowStats.FilterThroughput.Current, pipelineID) + ch <- prometheus.MustNewConstMetric(collector.FlowFilterLifetime, prometheus.GaugeValue, flowStats.FilterThroughput.Lifetime, pipelineID) + ch <- prometheus.MustNewConstMetric(collector.FlowOutputCurrent, prometheus.GaugeValue, flowStats.OutputThroughput.Current, pipelineID) + ch <- prometheus.MustNewConstMetric(collector.FlowOutputLifetime, prometheus.GaugeValue, flowStats.OutputThroughput.Lifetime, pipelineID) + ch <- prometheus.MustNewConstMetric(collector.FlowQueueBackpressureCurrent, prometheus.GaugeValue, flowStats.QueueBackpressure.Current, pipelineID) + ch <- prometheus.MustNewConstMetric(collector.FlowQueueBackpressureLifetime, prometheus.GaugeValue, flowStats.QueueBackpressure.Lifetime, pipelineID) + ch <- prometheus.MustNewConstMetric(collector.FlowWorkerConcurrencyCurrent, prometheus.GaugeValue, flowStats.WorkerConcurrency.Current, pipelineID) + ch <- prometheus.MustNewConstMetric(collector.FlowWorkerConcurrencyLifetime, prometheus.GaugeValue, flowStats.WorkerConcurrency.Lifetime, pipelineID) + ch <- prometheus.MustNewConstMetric(collector.FlowWorkerUtilizationCurrent, prometheus.GaugeValue, flowStats.WorkerUtilization.Current, pipelineID) + ch <- prometheus.MustNewConstMetric(collector.FlowWorkerUtilizationLifetime, prometheus.GaugeValue, flowStats.WorkerUtilization.Lifetime, pipelineID) deadLetterQueueStats := pipeStats.DeadLetterQueue ch <- prometheus.MustNewConstMetric(collector.DeadLetterQueueMaxSizeInBytes, prometheus.GaugeValue, float64(deadLetterQueueStats.MaxQueueSizeInBytes), pipelineID) diff --git a/fetcher/responses/__snapshots__/nodestats_response_test.snap b/fetcher/responses/__snapshots__/nodestats_response_test.snap index d4727f7c..75161ca0 100755 --- a/fetcher/responses/__snapshots__/nodestats_response_test.snap +++ b/fetcher/responses/__snapshots__/nodestats_response_test.snap @@ -52,6 +52,7 @@ responses.NodeStatsResponse{ OutputThroughput: struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" }{Current:0, Lifetime:24.63}, QueueBackpressure: struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" }{Current:1, Lifetime:0.9743}, WorkerConcurrency: struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" }{Current:10, Lifetime:9.752}, + WorkerUtilization: struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" }{}, }, Reloads: responses.ReloadResponse{}, Os: responses.OsResponse{}, @@ -60,8 +61,15 @@ responses.NodeStatsResponse{ ".monitoring-logstash": { Monitoring: responses.PipelineLogstashMonitoringResponse{}, Events: struct { Out int "json:\"out\""; Filtered int "json:\"filtered\""; In int "json:\"in\""; DurationInMillis int "json:\"duration_in_millis\""; QueuePushDurationInMillis int "json:\"queue_push_duration_in_millis\"" }{}, - Flow: responses.FlowResponse{}, - Plugins: struct { Inputs []struct { ID string "json:\"id\""; Name string "json:\"name\""; Events struct { Out int "json:\"out\""; QueuePushDurationInMillis int "json:\"queue_push_duration_in_millis\"" } "json:\"events\"" } "json:\"inputs\""; Codecs []struct { ID string "json:\"id\""; Name string "json:\"name\""; Decode struct { Out int "json:\"out\""; WritesIn int "json:\"writes_in\""; DurationInMillis int "json:\"duration_in_millis\"" } "json:\"decode\""; Encode struct { WritesIn int "json:\"writes_in\""; DurationInMillis int "json:\"duration_in_millis\"" } "json:\"encode\"" } "json:\"codecs\""; Filters []struct { ID string "json:\"id\""; Name string "json:\"name\""; Events struct { Out int "json:\"out\""; In int "json:\"in\""; DurationInMillis int "json:\"duration_in_millis\"" } "json:\"events\"" } "json:\"filters\""; Outputs []struct { ID string "json:\"id\""; Name string "json:\"name\""; Events struct { Out int "json:\"out\""; In int "json:\"in\""; DurationInMillis int "json:\"duration_in_millis\"" } "json:\"events\""; Documents struct { Successes int "json:\"successes\""; NonRetryableFailures int "json:\"non_retryable_failures\"" } "json:\"documents\""; BulkRequests struct { WithErrors int "json:\"with_errors\""; Responses map[string]int "json:\"responses\"" } "json:\"bulk_requests\"" } "json:\"outputs\"" }{ + Flow: responses.FlowResponse{ + InputThroughput: struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" }{Current:1, Lifetime:2}, + FilterThroughput: struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" }{Current:3, Lifetime:4}, + OutputThroughput: struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" }{Current:9, Lifetime:10}, + QueueBackpressure: struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" }{Current:7, Lifetime:8}, + WorkerConcurrency: struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" }{Current:5, Lifetime:6}, + WorkerUtilization: struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" }{Current:11, Lifetime:12}, + }, + Plugins: struct { Inputs []struct { ID string "json:\"id\""; Name string "json:\"name\""; Events struct { Out int "json:\"out\""; QueuePushDurationInMillis int "json:\"queue_push_duration_in_millis\"" } "json:\"events\"" } "json:\"inputs\""; Codecs []struct { ID string "json:\"id\""; Name string "json:\"name\""; Decode struct { Out int "json:\"out\""; WritesIn int "json:\"writes_in\""; DurationInMillis int "json:\"duration_in_millis\"" } "json:\"decode\""; Encode struct { WritesIn int "json:\"writes_in\""; DurationInMillis int "json:\"duration_in_millis\"" } "json:\"encode\"" } "json:\"codecs\""; Filters []struct { ID string "json:\"id\""; Name string "json:\"name\""; Events struct { Out int "json:\"out\""; In int "json:\"in\""; DurationInMillis int "json:\"duration_in_millis\"" } "json:\"events\"" } "json:\"filters\""; Outputs []struct { ID string "json:\"id\""; Name string "json:\"name\""; Events struct { Out int "json:\"out\""; In int "json:\"in\""; DurationInMillis int "json:\"duration_in_millis\"" } "json:\"events\""; Documents struct { Successes int "json:\"successes\""; NonRetryableFailures int "json:\"non_retryable_failures\"" } "json:\"documents\""; BulkRequests struct { WithErrors int "json:\"with_errors\""; Responses map[string]int "json:\"responses\"" } "json:\"bulk_requests\""; Flow struct { WorkerUtilization struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" } "json:\"worker_utilization\"" } "json:\"flow\"" } "json:\"outputs\"" }{ Inputs: { { ID: "9a9bed30135e19c8047fe6aa0588b70b15280fb9161fea8ed8e7368e1fb1e6d3", @@ -80,6 +88,9 @@ responses.NodeStatsResponse{ Events: struct { Out int "json:\"out\""; In int "json:\"in\""; DurationInMillis int "json:\"duration_in_millis\"" }{}, Documents: struct { Successes int "json:\"successes\""; NonRetryableFailures int "json:\"non_retryable_failures\"" }{}, BulkRequests: struct { WithErrors int "json:\"with_errors\""; Responses map[string]int "json:\"responses\"" }{}, + Flow: struct { WorkerUtilization struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" } "json:\"worker_utilization\"" }{ + WorkerUtilization: struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" }{Current:0.5, Lifetime:1.5}, + }, }, }, }, @@ -107,8 +118,9 @@ responses.NodeStatsResponse{ OutputThroughput: struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" }{Current:0, Lifetime:24.95}, QueueBackpressure: struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" }{Current:1, Lifetime:0.9872}, WorkerConcurrency: struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" }{Current:10, Lifetime:9.882}, + WorkerUtilization: struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" }{Current:100, Lifetime:98.82}, }, - Plugins: struct { Inputs []struct { ID string "json:\"id\""; Name string "json:\"name\""; Events struct { Out int "json:\"out\""; QueuePushDurationInMillis int "json:\"queue_push_duration_in_millis\"" } "json:\"events\"" } "json:\"inputs\""; Codecs []struct { ID string "json:\"id\""; Name string "json:\"name\""; Decode struct { Out int "json:\"out\""; WritesIn int "json:\"writes_in\""; DurationInMillis int "json:\"duration_in_millis\"" } "json:\"decode\""; Encode struct { WritesIn int "json:\"writes_in\""; DurationInMillis int "json:\"duration_in_millis\"" } "json:\"encode\"" } "json:\"codecs\""; Filters []struct { ID string "json:\"id\""; Name string "json:\"name\""; Events struct { Out int "json:\"out\""; In int "json:\"in\""; DurationInMillis int "json:\"duration_in_millis\"" } "json:\"events\"" } "json:\"filters\""; Outputs []struct { ID string "json:\"id\""; Name string "json:\"name\""; Events struct { Out int "json:\"out\""; In int "json:\"in\""; DurationInMillis int "json:\"duration_in_millis\"" } "json:\"events\""; Documents struct { Successes int "json:\"successes\""; NonRetryableFailures int "json:\"non_retryable_failures\"" } "json:\"documents\""; BulkRequests struct { WithErrors int "json:\"with_errors\""; Responses map[string]int "json:\"responses\"" } "json:\"bulk_requests\"" } "json:\"outputs\"" }{ + Plugins: struct { Inputs []struct { ID string "json:\"id\""; Name string "json:\"name\""; Events struct { Out int "json:\"out\""; QueuePushDurationInMillis int "json:\"queue_push_duration_in_millis\"" } "json:\"events\"" } "json:\"inputs\""; Codecs []struct { ID string "json:\"id\""; Name string "json:\"name\""; Decode struct { Out int "json:\"out\""; WritesIn int "json:\"writes_in\""; DurationInMillis int "json:\"duration_in_millis\"" } "json:\"decode\""; Encode struct { WritesIn int "json:\"writes_in\""; DurationInMillis int "json:\"duration_in_millis\"" } "json:\"encode\"" } "json:\"codecs\""; Filters []struct { ID string "json:\"id\""; Name string "json:\"name\""; Events struct { Out int "json:\"out\""; In int "json:\"in\""; DurationInMillis int "json:\"duration_in_millis\"" } "json:\"events\"" } "json:\"filters\""; Outputs []struct { ID string "json:\"id\""; Name string "json:\"name\""; Events struct { Out int "json:\"out\""; In int "json:\"in\""; DurationInMillis int "json:\"duration_in_millis\"" } "json:\"events\""; Documents struct { Successes int "json:\"successes\""; NonRetryableFailures int "json:\"non_retryable_failures\"" } "json:\"documents\""; BulkRequests struct { WithErrors int "json:\"with_errors\""; Responses map[string]int "json:\"responses\"" } "json:\"bulk_requests\""; Flow struct { WorkerUtilization struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" } "json:\"worker_utilization\"" } "json:\"flow\"" } "json:\"outputs\"" }{ Inputs: { { ID: "5ee0ea3d45c32bab3b41963bd900e758ba6e193a11079649302574c706fd5e2f", @@ -183,6 +195,9 @@ responses.NodeStatsResponse{ WithErrors: 0, Responses: {"200":10}, }, + Flow: struct { WorkerUtilization struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" } "json:\"worker_utilization\"" }{ + WorkerUtilization: struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" }{Current:0, Lifetime:0.9756}, + }, }, }, }, diff --git a/fetcher/responses/nodestats_response.go b/fetcher/responses/nodestats_response.go index 73229037..4609e8ae 100644 --- a/fetcher/responses/nodestats_response.go +++ b/fetcher/responses/nodestats_response.go @@ -106,6 +106,10 @@ type FlowResponse struct { Current float64 `json:"current"` Lifetime float64 `json:"lifetime"` } `json:"worker_concurrency"` + WorkerUtilization struct { + Current float64 `json:"current"` + Lifetime float64 `json:"lifetime"` + } `json:"worker_utilization"` } type SinglePipelineResponse struct { @@ -165,6 +169,12 @@ type SinglePipelineResponse struct { WithErrors int `json:"with_errors"` Responses map[string]int `json:"responses"` } `json:"bulk_requests"` + Flow struct { + WorkerUtilization struct { + Current float64 `json:"current"` + Lifetime float64 `json:"lifetime"` + } `json:"worker_utilization"` + } `json:"flow"` } `json:"outputs"` } `json:"plugins"` Reloads PipelineReloadResponse `json:"reloads"` diff --git a/fixtures/node_stats.json b/fixtures/node_stats.json index e269266a..e832ed4d 100644 --- a/fixtures/node_stats.json +++ b/fixtures/node_stats.json @@ -119,28 +119,28 @@ }, "flow": { "input_throughput": { - "current": 0.0, - "lifetime": 0.0 + "current": 1.0, + "lifetime": 2.0 }, "filter_throughput": { - "current": 0.0, - "lifetime": 0.0 + "current": 3.0, + "lifetime": 4.0 }, "worker_concurrency": { - "current": 0.0, - "lifetime": 0.0 + "current": 5.0, + "lifetime": 6.0 }, "queue_backpressure": { - "current": 0.0, - "lifetime": 0.0 + "current": 7.0, + "lifetime": 8.0 }, "output_throughput": { - "current": 0.0, - "lifetime": 0.0 + "current": 9.0, + "lifetime": 10.0 }, "worker_utilization": { - "current": 0.0, - "lifetime": 0.0 + "current": 11.0, + "lifetime": 12.0 } }, "plugins": { @@ -170,8 +170,8 @@ "flow": { "worker_millis_per_event": {}, "worker_utilization": { - "current": 0.0, - "lifetime": 0.0 + "current": 0.5, + "lifetime": 1.5 } } } diff --git a/scripts/snapshots/metric_names.txt b/scripts/snapshots/metric_names.txt index e6349c83..8dea59e9 100644 --- a/scripts/snapshots/metric_names.txt +++ b/scripts/snapshots/metric_names.txt @@ -80,3 +80,5 @@ logstash_stats_flow_queue_backpressure_current logstash_stats_flow_queue_backpressure_lifetime logstash_stats_flow_worker_concurrency_current logstash_stats_flow_worker_concurrency_lifetime +logstash_stats_pipeline_flow_worker_utilization_current +logstash_stats_pipeline_flow_worker_utilization_lifetime