Skip to content

Commit

Permalink
Fix flow lifetime metrics, add flow utilization metrics kuskoman#366 (k…
Browse files Browse the repository at this point in the history
  • Loading branch information
satk0 authored Oct 10, 2024
1 parent 1661c54 commit ecade38
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 7 deletions.
16 changes: 11 additions & 5 deletions internal/collectors/nodestats/pipeline_subcollector.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ type PipelineSubcollector struct {
FlowQueueBackpressureLifetime *prometheus.Desc
FlowWorkerConcurrencyCurrent *prometheus.Desc
FlowWorkerConcurrencyLifetime *prometheus.Desc
FlowWorkerUtilizationCurrent *prometheus.Desc
FlowWorkerUtilizationLifetime *prometheus.Desc

DeadLetterQueueMaxSizeInBytes *prometheus.Desc
DeadLetterQueueSizeInBytes *prometheus.Desc
Expand Down Expand Up @@ -103,6 +105,8 @@ func NewPipelineSubcollector() *PipelineSubcollector {
FlowQueueBackpressureLifetime: descHelper.NewDesc("flow_queue_backpressure_lifetime", "Lifetime number of events in the backpressure queue.", "pipeline"),
FlowWorkerConcurrencyCurrent: descHelper.NewDesc("flow_worker_concurrency_current", "Current number of workers.", "pipeline"),
FlowWorkerConcurrencyLifetime: descHelper.NewDesc("flow_worker_concurrency_lifetime", "Lifetime number of workers.", "pipeline"),
FlowWorkerUtilizationCurrent: descHelper.NewDesc("flow_worker_utilization_current", "Current worker utilization.", "pipeline"),
FlowWorkerUtilizationLifetime: descHelper.NewDesc("flow_worker_utilization_lifetime", "Lifetime worker utilization.", "pipeline"),

DeadLetterQueueMaxSizeInBytes: descHelper.NewDesc("dead_letter_queue_max_size_in_bytes", "Maximum size of the dead letter queue in bytes.", "pipeline"),
DeadLetterQueueSizeInBytes: descHelper.NewDesc("dead_letter_queue_size_in_bytes", "Current size of the dead letter queue in bytes.", "pipeline"),
Expand Down Expand Up @@ -150,15 +154,17 @@ func (subcollector *PipelineSubcollector) Collect(pipeStats *responses.SinglePip
// ***** FLOW *****
flowStats := pipeStats.Flow
metricsHelper.NewFloatMetric(subcollector.FlowInputCurrent, prometheus.GaugeValue, flowStats.InputThroughput.Current)
metricsHelper.NewFloatMetric(subcollector.FlowInputLifetime, prometheus.CounterValue, flowStats.InputThroughput.Lifetime)
metricsHelper.NewFloatMetric(subcollector.FlowInputLifetime, prometheus.GaugeValue, flowStats.InputThroughput.Lifetime)
metricsHelper.NewFloatMetric(subcollector.FlowFilterCurrent, prometheus.GaugeValue, flowStats.FilterThroughput.Current)
metricsHelper.NewFloatMetric(subcollector.FlowFilterLifetime, prometheus.CounterValue, flowStats.FilterThroughput.Lifetime)
metricsHelper.NewFloatMetric(subcollector.FlowFilterLifetime, prometheus.GaugeValue, flowStats.FilterThroughput.Lifetime)
metricsHelper.NewFloatMetric(subcollector.FlowOutputCurrent, prometheus.GaugeValue, flowStats.OutputThroughput.Current)
metricsHelper.NewFloatMetric(subcollector.FlowOutputLifetime, prometheus.CounterValue, flowStats.OutputThroughput.Lifetime)
metricsHelper.NewFloatMetric(subcollector.FlowOutputLifetime, prometheus.GaugeValue, flowStats.OutputThroughput.Lifetime)
metricsHelper.NewFloatMetric(subcollector.FlowQueueBackpressureCurrent, prometheus.GaugeValue, flowStats.QueueBackpressure.Current)
metricsHelper.NewFloatMetric(subcollector.FlowQueueBackpressureLifetime, prometheus.CounterValue, flowStats.QueueBackpressure.Lifetime)
metricsHelper.NewFloatMetric(subcollector.FlowQueueBackpressureLifetime, prometheus.GaugeValue, flowStats.QueueBackpressure.Lifetime)
metricsHelper.NewFloatMetric(subcollector.FlowWorkerConcurrencyCurrent, prometheus.GaugeValue, flowStats.WorkerConcurrency.Current)
metricsHelper.NewFloatMetric(subcollector.FlowWorkerConcurrencyLifetime, prometheus.CounterValue, flowStats.WorkerConcurrency.Lifetime)
metricsHelper.NewFloatMetric(subcollector.FlowWorkerConcurrencyLifetime, prometheus.GaugeValue, flowStats.WorkerConcurrency.Lifetime)
metricsHelper.NewFloatMetric(subcollector.FlowWorkerUtilizationCurrent, prometheus.GaugeValue, flowStats.WorkerUtilization.Current)
metricsHelper.NewFloatMetric(subcollector.FlowWorkerUtilizationLifetime, prometheus.GaugeValue, flowStats.WorkerUtilization.Lifetime)
// ****************

// ***** DEAD LETTER QUEUE *****
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ responses.NodeStatsResponse{
OutputThroughput: struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" }{Current:0, Lifetime:24.63},
QueueBackpressure: struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" }{Current:1, Lifetime:0.9743},
WorkerConcurrency: struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" }{Current:10, Lifetime:9.752},
WorkerUtilization: struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" }{},
},
Reloads: responses.ReloadResponse{},
Os: responses.OsResponse{},
Expand All @@ -61,7 +62,7 @@ responses.NodeStatsResponse{
Monitoring: responses.PipelineLogstashMonitoringResponse{},
Events: responses.EventsResponse{},
Flow: responses.FlowResponse{},
Plugins: struct { Inputs []struct { ID string "json:\"id\""; Name string "json:\"name\""; Events struct { Out int "json:\"out\""; QueuePushDurationInMillis int "json:\"queue_push_duration_in_millis\"" } "json:\"events\"" } "json:\"inputs\""; Codecs []struct { ID string "json:\"id\""; Name string "json:\"name\""; Decode struct { Out int "json:\"out\""; WritesIn int "json:\"writes_in\""; DurationInMillis int "json:\"duration_in_millis\"" } "json:\"decode\""; Encode struct { WritesIn int "json:\"writes_in\""; DurationInMillis int "json:\"duration_in_millis\"" } "json:\"encode\"" } "json:\"codecs\""; Filters []struct { ID string "json:\"id\""; Name string "json:\"name\""; Events struct { Out int "json:\"out\""; In int "json:\"in\""; DurationInMillis int "json:\"duration_in_millis\"" } "json:\"events\"" } "json:\"filters\""; Outputs []struct { ID string "json:\"id\""; Name string "json:\"name\""; Events struct { Out int "json:\"out\""; In int "json:\"in\""; DurationInMillis int "json:\"duration_in_millis\"" } "json:\"events\""; Documents struct { Successes int "json:\"successes\""; NonRetryableFailures int "json:\"non_retryable_failures\"" } "json:\"documents\""; BulkRequests struct { WithErrors int "json:\"with_errors\""; Responses map[string]int "json:\"responses\"" } "json:\"bulk_requests\"" } "json:\"outputs\"" }{
Plugins: struct { Inputs []struct { ID string "json:\"id\""; Name string "json:\"name\""; Events struct { Out int "json:\"out\""; QueuePushDurationInMillis int "json:\"queue_push_duration_in_millis\"" } "json:\"events\"" } "json:\"inputs\""; Codecs []struct { ID string "json:\"id\""; Name string "json:\"name\""; Decode struct { Out int "json:\"out\""; WritesIn int "json:\"writes_in\""; DurationInMillis int "json:\"duration_in_millis\"" } "json:\"decode\""; Encode struct { WritesIn int "json:\"writes_in\""; DurationInMillis int "json:\"duration_in_millis\"" } "json:\"encode\"" } "json:\"codecs\""; Filters []struct { ID string "json:\"id\""; Name string "json:\"name\""; Events struct { Out int "json:\"out\""; In int "json:\"in\""; DurationInMillis int "json:\"duration_in_millis\"" } "json:\"events\""; Flow struct { WorkerUtilization struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" } "json:\"worker_utilization\""; WorkerMillisPerEvent struct { Lifetime float64 "json:\"lifetime\"" } "json:\"worker_millis_per_event\"" } "json:\"flow\"" } "json:\"filters\""; Outputs []struct { ID string "json:\"id\""; Name string "json:\"name\""; Events struct { Out int "json:\"out\""; In int "json:\"in\""; DurationInMillis int "json:\"duration_in_millis\"" } "json:\"events\""; Documents struct { Successes int "json:\"successes\""; NonRetryableFailures int "json:\"non_retryable_failures\"" } "json:\"documents\""; BulkRequests struct { WithErrors int "json:\"with_errors\""; Responses map[string]int "json:\"responses\"" } "json:\"bulk_requests\""; Flow struct { WorkerUtilization struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" } "json:\"worker_utilization\"" } "json:\"flow\"" } "json:\"outputs\"" }{
Inputs: {
{
ID: "9a9bed30135e19c8047fe6aa0588b70b15280fb9161fea8ed8e7368e1fb1e6d3",
Expand All @@ -80,6 +81,7 @@ responses.NodeStatsResponse{
Events: struct { Out int "json:\"out\""; In int "json:\"in\""; DurationInMillis int "json:\"duration_in_millis\"" }{},
Documents: struct { Successes int "json:\"successes\""; NonRetryableFailures int "json:\"non_retryable_failures\"" }{},
BulkRequests: struct { WithErrors int "json:\"with_errors\""; Responses map[string]int "json:\"responses\"" }{},
Flow: struct { WorkerUtilization struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" } "json:\"worker_utilization\"" }{},
},
},
},
Expand Down Expand Up @@ -107,8 +109,9 @@ responses.NodeStatsResponse{
OutputThroughput: struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" }{Current:0, Lifetime:24.95},
QueueBackpressure: struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" }{Current:1, Lifetime:0.9872},
WorkerConcurrency: struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" }{Current:10, Lifetime:9.882},
WorkerUtilization: struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" }{Current:100, Lifetime:98.82},
},
Plugins: struct { Inputs []struct { ID string "json:\"id\""; Name string "json:\"name\""; Events struct { Out int "json:\"out\""; QueuePushDurationInMillis int "json:\"queue_push_duration_in_millis\"" } "json:\"events\"" } "json:\"inputs\""; Codecs []struct { ID string "json:\"id\""; Name string "json:\"name\""; Decode struct { Out int "json:\"out\""; WritesIn int "json:\"writes_in\""; DurationInMillis int "json:\"duration_in_millis\"" } "json:\"decode\""; Encode struct { WritesIn int "json:\"writes_in\""; DurationInMillis int "json:\"duration_in_millis\"" } "json:\"encode\"" } "json:\"codecs\""; Filters []struct { ID string "json:\"id\""; Name string "json:\"name\""; Events struct { Out int "json:\"out\""; In int "json:\"in\""; DurationInMillis int "json:\"duration_in_millis\"" } "json:\"events\"" } "json:\"filters\""; Outputs []struct { ID string "json:\"id\""; Name string "json:\"name\""; Events struct { Out int "json:\"out\""; In int "json:\"in\""; DurationInMillis int "json:\"duration_in_millis\"" } "json:\"events\""; Documents struct { Successes int "json:\"successes\""; NonRetryableFailures int "json:\"non_retryable_failures\"" } "json:\"documents\""; BulkRequests struct { WithErrors int "json:\"with_errors\""; Responses map[string]int "json:\"responses\"" } "json:\"bulk_requests\"" } "json:\"outputs\"" }{
Plugins: struct { Inputs []struct { ID string "json:\"id\""; Name string "json:\"name\""; Events struct { Out int "json:\"out\""; QueuePushDurationInMillis int "json:\"queue_push_duration_in_millis\"" } "json:\"events\"" } "json:\"inputs\""; Codecs []struct { ID string "json:\"id\""; Name string "json:\"name\""; Decode struct { Out int "json:\"out\""; WritesIn int "json:\"writes_in\""; DurationInMillis int "json:\"duration_in_millis\"" } "json:\"decode\""; Encode struct { WritesIn int "json:\"writes_in\""; DurationInMillis int "json:\"duration_in_millis\"" } "json:\"encode\"" } "json:\"codecs\""; Filters []struct { ID string "json:\"id\""; Name string "json:\"name\""; Events struct { Out int "json:\"out\""; In int "json:\"in\""; DurationInMillis int "json:\"duration_in_millis\"" } "json:\"events\""; Flow struct { WorkerUtilization struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" } "json:\"worker_utilization\""; WorkerMillisPerEvent struct { Lifetime float64 "json:\"lifetime\"" } "json:\"worker_millis_per_event\"" } "json:\"flow\"" } "json:\"filters\""; Outputs []struct { ID string "json:\"id\""; Name string "json:\"name\""; Events struct { Out int "json:\"out\""; In int "json:\"in\""; DurationInMillis int "json:\"duration_in_millis\"" } "json:\"events\""; Documents struct { Successes int "json:\"successes\""; NonRetryableFailures int "json:\"non_retryable_failures\"" } "json:\"documents\""; BulkRequests struct { WithErrors int "json:\"with_errors\""; Responses map[string]int "json:\"responses\"" } "json:\"bulk_requests\""; Flow struct { WorkerUtilization struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" } "json:\"worker_utilization\"" } "json:\"flow\"" } "json:\"outputs\"" }{
Inputs: {
{
ID: "5ee0ea3d45c32bab3b41963bd900e758ba6e193a11079649302574c706fd5e2f",
Expand Down Expand Up @@ -146,31 +149,49 @@ responses.NodeStatsResponse{
ID: "prune-http-input-fields",
Name: "prune",
Events: struct { Out int "json:\"out\""; In int "json:\"in\""; DurationInMillis int "json:\"duration_in_millis\"" }{Out:1250, In:1250, DurationInMillis:127},
Flow: struct { WorkerUtilization struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" } "json:\"worker_utilization\""; WorkerMillisPerEvent struct { Lifetime float64 "json:\"lifetime\"" } "json:\"worker_millis_per_event\"" }{
WorkerUtilization: struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" }{Current:0, Lifetime:0.02535},
WorkerMillisPerEvent: struct { Lifetime float64 "json:\"lifetime\"" }{Lifetime:0.1016},
},
},
{
ID: "ca953dac49c8fd3b00ba8275af10f9c6bcd9ca95755cd7892952966c5a13d427",
Name: "ruby",
Events: struct { Out int "json:\"out\""; In int "json:\"in\""; DurationInMillis int "json:\"duration_in_millis\"" }{Out:1250, In:2500, DurationInMillis:489610},
Flow: struct { WorkerUtilization struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" } "json:\"worker_utilization\""; WorkerMillisPerEvent struct { Lifetime float64 "json:\"lifetime\"" } "json:\"worker_millis_per_event\"" }{
WorkerUtilization: struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" }{Current:100, Lifetime:97.74},
WorkerMillisPerEvent: struct { Lifetime float64 "json:\"lifetime\"" }{Lifetime:195.8},
},
},
{
ID: "drop-non-existent",
Name: "drop",
Events: struct { Out int "json:\"out\""; In int "json:\"in\""; DurationInMillis int "json:\"duration_in_millis\"" }{},
Flow: struct { WorkerUtilization struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" } "json:\"worker_utilization\""; WorkerMillisPerEvent struct { Lifetime float64 "json:\"lifetime\"" } "json:\"worker_millis_per_event\"" }{},
},
{
ID: "json-filter",
Name: "json",
Events: struct { Out int "json:\"out\""; In int "json:\"in\""; DurationInMillis int "json:\"duration_in_millis\"" }{Out:1250, In:1250, DurationInMillis:214},
Flow: struct { WorkerUtilization struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" } "json:\"worker_utilization\""; WorkerMillisPerEvent struct { Lifetime float64 "json:\"lifetime\"" } "json:\"worker_millis_per_event\"" }{
WorkerUtilization: struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" }{Current:0, Lifetime:0.04272},
WorkerMillisPerEvent: struct { Lifetime float64 "json:\"lifetime\"" }{Lifetime:0.1712},
},
},
{
ID: "mutate-path-001",
Name: "mutate",
Events: struct { Out int "json:\"out\""; In int "json:\"in\""; DurationInMillis int "json:\"duration_in_millis\"" }{Out:1250, In:1250, DurationInMillis:170},
Flow: struct { WorkerUtilization struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" } "json:\"worker_utilization\""; WorkerMillisPerEvent struct { Lifetime float64 "json:\"lifetime\"" } "json:\"worker_millis_per_event\"" }{
WorkerUtilization: struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" }{Current:0, Lifetime:0.03394},
WorkerMillisPerEvent: struct { Lifetime float64 "json:\"lifetime\"" }{Lifetime:0.136},
},
},
{
ID: "drop-80-percent",
Name: "drop",
Events: struct { Out int "json:\"out\""; In int "json:\"in\""; DurationInMillis int "json:\"duration_in_millis\"" }{},
Flow: struct { WorkerUtilization struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" } "json:\"worker_utilization\""; WorkerMillisPerEvent struct { Lifetime float64 "json:\"lifetime\"" } "json:\"worker_millis_per_event\"" }{},
},
},
Outputs: {
Expand All @@ -183,6 +204,9 @@ responses.NodeStatsResponse{
WithErrors: 0,
Responses: {"200":10},
},
Flow: struct { WorkerUtilization struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" } "json:\"worker_utilization\"" }{
WorkerUtilization: struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" }{Current:0, Lifetime:0.9756},
},
},
},
},
Expand Down
19 changes: 19 additions & 0 deletions internal/fetcher/responses/nodestats_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ type FlowResponse struct {
Current float64 `json:"current"`
Lifetime float64 `json:"lifetime"`
} `json:"worker_concurrency"`
WorkerUtilization struct {
Current float64 `json:"current"`
Lifetime float64 `json:"lifetime"`
} `json:"worker_utilization"`
}

type SinglePipelineResponse struct {
Expand Down Expand Up @@ -131,6 +135,15 @@ type SinglePipelineResponse struct {
In int `json:"in"`
DurationInMillis int `json:"duration_in_millis"`
} `json:"events"`
Flow struct {
WorkerUtilization struct {
Current float64 `json:"current"`
Lifetime float64 `json:"lifetime"`
} `json:"worker_utilization"`
WorkerMillisPerEvent struct {
Lifetime float64 `json:"lifetime"`
} `json:"worker_millis_per_event"`
} `json:"flow"`
} `json:"filters"`
Outputs []struct {
ID string `json:"id"`
Expand All @@ -148,6 +161,12 @@ type SinglePipelineResponse struct {
WithErrors int `json:"with_errors"`
Responses map[string]int `json:"responses"`
} `json:"bulk_requests"`
Flow struct {
WorkerUtilization struct {
Current float64 `json:"current"`
Lifetime float64 `json:"lifetime"`
} `json:"worker_utilization"`
} `json:"flow"`
} `json:"outputs"`
} `json:"plugins"`
Reloads PipelineReloadResponse `json:"reloads"`
Expand Down
2 changes: 2 additions & 0 deletions scripts/snapshots/metric_names.txt
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,5 @@ logstash_stats_flow_queue_backpressure_current
logstash_stats_flow_queue_backpressure_lifetime
logstash_stats_flow_worker_concurrency_current
logstash_stats_flow_worker_concurrency_lifetime
logstash_stats_pipeline_flow_worker_utilization_current
logstash_stats_pipeline_flow_worker_utilization_lifetime

0 comments on commit ecade38

Please sign in to comment.