Skip to content

Commit

Permalink
Add some of dead_letter metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
kuskoman committed Jun 16, 2023
1 parent 79d685a commit 859ccd6
Show file tree
Hide file tree
Showing 9 changed files with 76 additions and 23 deletions.
10 changes: 8 additions & 2 deletions .docker/logstash.conf
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ input {
message => '{"message": "dummy log"}'
codec => plain {id => "plain-codec-001"}
}

dead_letter_queue {
path => "/usr/share/logstash/data/dead_letter_queue"
commit_offsets => true
pipeline_id => "main"
}
}

filter {
Expand All @@ -14,13 +20,13 @@ filter {
}
# There are too many of these. Drop 80% of them.
if [message][message] == "dummy log" {
drop {
drop {
id => "drop-80-percent"
percentage => 80
}
}
if [massage][non_existent] {
drop {
drop {
id => "drop-non-existent"
}
}
Expand Down
4 changes: 4 additions & 0 deletions .docker/logstash.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
http.host: "0.0.0.0"
xpack.monitoring.elasticsearch.hosts: ["http://elasticsearch:9200"]
dead_letter_queue.enable: true
path.dead_letter_queue: "/usr/share/logstash/data/dead_letter_queue"
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,10 @@ Table of exported metrics:
| logstash_stats_jvm_threads_count | gauge | Number of live threads including both daemon and non-daemon threads. |
| logstash_stats_jvm_threads_peak_count | gauge | Peak live thread count since the Java virtual machine started or peak was reset. |
| logstash_stats_jvm_uptime_millis | gauge | Uptime of the JVM in milliseconds. |
| logstash_stats_pipeline_dead_letter_queue_dropped_events | counter | Number of events dropped by the dead letter queue. |
| logstash_stats_pipeline_dead_letter_queue_expired_events | counter | Number of events expired in the dead letter queue. |
| logstash_stats_pipeline_dead_letter_queue_max_size_in_bytes | counter | Maximum size of the dead letter queue in bytes. |
| logstash_stats_pipeline_dead_letter_queue_size_in_bytes | counter | Current size of the dead letter queue in bytes. |
| logstash_stats_pipeline_events_duration | counter | Time needed to process event. |
| logstash_stats_pipeline_events_filtered | counter | Number of events that have been filtered out by this pipeline. |
| logstash_stats_pipeline_events_in | counter | Number of events that have been inputted into this pipeline. |
Expand Down
16 changes: 16 additions & 0 deletions collectors/nodestats/pipeline_subcollector.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ type PipelineSubcollector struct {
FlowQueueBackpressureLifetime *prometheus.Desc
FlowWorkerConcurrencyCurrent *prometheus.Desc
FlowWorkerConcurrencyLifetime *prometheus.Desc

DeadLetterQueueMaxSizeInBytes *prometheus.Desc
DeadLetterQueueSizeInBytes *prometheus.Desc
DeadLetterQueueDroppedEvents *prometheus.Desc
DeadLetterQueueExpiredEvents *prometheus.Desc
}

func NewPipelineSubcollector() *PipelineSubcollector {
Expand Down Expand Up @@ -98,6 +103,11 @@ func NewPipelineSubcollector() *PipelineSubcollector {
FlowQueueBackpressureLifetime: descHelper.NewDescWithHelpAndLabels("flow_queue_backpressure_lifetime", "Lifetime number of events in the backpressure queue.", "pipeline"),
FlowWorkerConcurrencyCurrent: descHelper.NewDescWithHelpAndLabels("flow_worker_concurrency_current", "Current number of workers.", "pipeline"),
FlowWorkerConcurrencyLifetime: descHelper.NewDescWithHelpAndLabels("flow_worker_concurrency_lifetime", "Lifetime number of workers.", "pipeline"),

DeadLetterQueueMaxSizeInBytes: descHelper.NewDescWithHelpAndLabels("dead_letter_queue_max_size_in_bytes", "Maximum size of the dead letter queue in bytes.", "pipeline"),
DeadLetterQueueSizeInBytes: descHelper.NewDescWithHelpAndLabels("dead_letter_queue_size_in_bytes", "Current size of the dead letter queue in bytes.", "pipeline"),
DeadLetterQueueDroppedEvents: descHelper.NewDescWithHelpAndLabels("dead_letter_queue_dropped_events", "Number of events dropped by the dead letter queue.", "pipeline"),
DeadLetterQueueExpiredEvents: descHelper.NewDescWithHelpAndLabels("dead_letter_queue_expired_events", "Number of events expired in the dead letter queue.", "pipeline"),
}
}

Expand Down Expand Up @@ -139,6 +149,12 @@ func (collector *PipelineSubcollector) Collect(pipeStats *responses.SinglePipeli
ch <- prometheus.MustNewConstMetric(collector.FlowWorkerConcurrencyCurrent, prometheus.GaugeValue, float64(flowStats.WorkerConcurrency.Current), pipelineID)
ch <- prometheus.MustNewConstMetric(collector.FlowWorkerConcurrencyLifetime, prometheus.CounterValue, float64(flowStats.WorkerConcurrency.Lifetime), pipelineID)

deadLetterQueueStats := pipeStats.DeadLetterQueue
ch <- prometheus.MustNewConstMetric(collector.DeadLetterQueueMaxSizeInBytes, prometheus.CounterValue, float64(deadLetterQueueStats.MaxQueueSizeInBytes), pipelineID)
ch <- prometheus.MustNewConstMetric(collector.DeadLetterQueueSizeInBytes, prometheus.CounterValue, float64(deadLetterQueueStats.QueueSizeInBytes), pipelineID)
ch <- prometheus.MustNewConstMetric(collector.DeadLetterQueueDroppedEvents, prometheus.CounterValue, float64(deadLetterQueueStats.DroppedEvents), pipelineID)
ch <- prometheus.MustNewConstMetric(collector.DeadLetterQueueExpiredEvents, prometheus.CounterValue, float64(deadLetterQueueStats.ExpiredEvents), pipelineID)

// Output error metrics
for _, output := range pipeStats.Plugins.Outputs {
pluginID := output.ID
Expand Down
9 changes: 5 additions & 4 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
version: "3.8"
services:
logstash:
image: docker.elastic.co/logstash/logstash:8.6.1
image: docker.elastic.co/logstash/logstash:8.8.1
restart: unless-stopped
volumes:
- logstash-data:/usr/share/logstash/data
- ./.docker/logstash.conf:/usr/share/logstash/pipeline/logstash.conf
- ./.docker/logstash.conf:/usr/share/logstash/pipeline/logstash.conf:ro
- ./.docker/logstash.yml:/usr/share/logstash/config/logstash.yml:ro
depends_on:
- elasticsearch
healthcheck:
Expand All @@ -22,7 +23,7 @@ services:
- ${LOGSTASH_PORT:-5044}:5044
- ${LOGSTASH_STATS_PORT:-9600}:9600
prometheus:
image: prom/prometheus:v2.42.0
image: prom/prometheus:v2.44.0
restart: unless-stopped
volumes:
- prometheus-data:/prometheus
Expand All @@ -41,7 +42,7 @@ services:
timeout: 10s
retries: 8
elasticsearch:
image: elasticsearch:8.6.1
image: elasticsearch:8.8.0
restart: unless-stopped
volumes:
- elasticsearch-data:/usr/share/elasticsearch/data
Expand Down
26 changes: 14 additions & 12 deletions fetcher/responses/__snapshots__/nodestats_response_test.snap
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
[TestNodeStatsResponseStructure - 1]
Unmarshalled NodestatsResponse
responses.NodeStatsResponse{
Host: "9e6e14cf5532",
Version: "8.6.1",
Host: "f26e584ecf05",
Version: "8.8.1",
HttpAddress: "0.0.0.0:9600",
Id: "a2c7110e-5ccf-4226-bc9b-e773710e66a0",
Name: "9e6e14cf5532",
EphemeralId: "4a2ee04f-2733-4eaa-887d-675bad27f07c",
Id: "096d672d-50d5-420b-a27c-254c089bdd78",
Name: "f26e584ecf05",
EphemeralId: "25135ee3-be69-4076-bda1-e27524d9ee93",
Status: "green",
Snapshot: false,
Pipeline: responses.PipelineResponse{Workers:16, BatchSize:125, BatchDelay:50},
Expand Down Expand Up @@ -73,10 +73,11 @@ responses.NodeStatsResponse{
Outputs: {
},
},
Reloads: responses.PipelineReloadResponse{},
Queue: struct { Type string "json:\"type\""; EventsCount int "json:\"events_count\""; QueueSizeInBytes int "json:\"queue_size_in_bytes\""; MaxQueueSizeInBytes int "json:\"max_queue_size_in_bytes\"" }{},
Hash: "",
EphemeralID: "",
Reloads: responses.PipelineReloadResponse{},
Queue: struct { Type string "json:\"type\""; EventsCount int "json:\"events_count\""; QueueSizeInBytes int "json:\"queue_size_in_bytes\""; MaxQueueSizeInBytes int "json:\"max_queue_size_in_bytes\"" }{},
DeadLetterQueue: struct { MaxQueueSizeInBytes int "json:\"max_queue_size_in_bytes\""; QueueSizeInBytes int "json:\"queue_size_in_bytes\""; DroppedEvents int "json:\"dropped_events\""; ExpiredEvents int "json:\"expired_events\""; StoragePolicy string "json:\"storage_policy\"" }{},
Hash: "",
EphemeralID: "",
},
"main": {
Monitoring: responses.PipelineLogstashMonitoringResponse{},
Expand Down Expand Up @@ -150,9 +151,10 @@ responses.NodeStatsResponse{
Backtrace: {"org/logstash/execution/AbstractPipelineExt.java:151:in `reload_pipeline'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:181:in `block in reload_pipeline'", "/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/stud-0.0.23/lib/stud/task.rb:24:in `block in initialize'"},
},
},
Queue: struct { Type string "json:\"type\""; EventsCount int "json:\"events_count\""; QueueSizeInBytes int "json:\"queue_size_in_bytes\""; MaxQueueSizeInBytes int "json:\"max_queue_size_in_bytes\"" }{Type:"memory", EventsCount:0, QueueSizeInBytes:0, MaxQueueSizeInBytes:0},
Hash: "a73729cc9c29203931db21553c5edba063820a7e40d16cb5053be75cc3811a17",
EphemeralID: "a5c63d09-1ba6-4d67-90a5-075f468a7ab0",
Queue: struct { Type string "json:\"type\""; EventsCount int "json:\"events_count\""; QueueSizeInBytes int "json:\"queue_size_in_bytes\""; MaxQueueSizeInBytes int "json:\"max_queue_size_in_bytes\"" }{Type:"memory", EventsCount:0, QueueSizeInBytes:0, MaxQueueSizeInBytes:0},
DeadLetterQueue: struct { MaxQueueSizeInBytes int "json:\"max_queue_size_in_bytes\""; QueueSizeInBytes int "json:\"queue_size_in_bytes\""; DroppedEvents int "json:\"dropped_events\""; ExpiredEvents int "json:\"expired_events\""; StoragePolicy string "json:\"storage_policy\"" }{MaxQueueSizeInBytes:1073741824, QueueSizeInBytes:1, DroppedEvents:0, ExpiredEvents:0, StoragePolicy:"drop_newer"},
Hash: "a73729cc9c29203931db21553c5edba063820a7e40d16cb5053be75cc3811a17",
EphemeralID: "a5c63d09-1ba6-4d67-90a5-075f468a7ab0",
},
},
}
Expand Down
8 changes: 8 additions & 0 deletions fetcher/responses/nodestats_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,14 @@ type SinglePipelineResponse struct {
QueueSizeInBytes int `json:"queue_size_in_bytes"`
MaxQueueSizeInBytes int `json:"max_queue_size_in_bytes"`
} `json:"queue"`
DeadLetterQueue struct {
MaxQueueSizeInBytes int `json:"max_queue_size_in_bytes"`
// todo: research how LastError is returned
QueueSizeInBytes int `json:"queue_size_in_bytes"`
DroppedEvents int `json:"dropped_events"`
ExpiredEvents int `json:"expired_events"`
StoragePolicy string `json:"storage_policy"`
} `json:"dead_letter_queue"`
Hash string `json:"hash"`
EphemeralID string `json:"ephemeral_id"`
}
Expand Down
18 changes: 13 additions & 5 deletions fixtures/node_stats.json
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
{
"host": "9e6e14cf5532",
"version": "8.6.1",
"host": "f26e584ecf05",
"version": "8.8.1",
"http_address": "0.0.0.0:9600",
"id": "a2c7110e-5ccf-4226-bc9b-e773710e66a0",
"name": "9e6e14cf5532",
"ephemeral_id": "4a2ee04f-2733-4eaa-887d-675bad27f07c",
"id": "096d672d-50d5-420b-a27c-254c089bdd78",
"name": "f26e584ecf05",
"ephemeral_id": "25135ee3-be69-4076-bda1-e27524d9ee93",
"status": "green",
"snapshot": false,
"pipeline": {
Expand Down Expand Up @@ -249,6 +249,14 @@
"queue_size_in_bytes": 0,
"max_queue_size_in_bytes": 0
},
"dead_letter_queue": {
"max_queue_size_in_bytes": 1073741824,
"last_error": "no errors",
"queue_size_in_bytes": 1,
"dropped_events": 0,
"expired_events": 0,
"storage_policy": "drop_newer"
},
"hash": "a73729cc9c29203931db21553c5edba063820a7e40d16cb5053be75cc3811a17",
"ephemeral_id": "a5c63d09-1ba6-4d67-90a5-075f468a7ab0"
},
Expand Down
4 changes: 4 additions & 0 deletions scripts/snapshots/metric_names.txt
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,7 @@ logstash_stats_pipeline_flow_queue_backpressure_current
logstash_stats_pipeline_flow_queue_backpressure_lifetime
logstash_stats_pipeline_flow_worker_concurrency_current
logstash_stats_pipeline_flow_worker_concurrency_lifetime
logstash_stats_pipeline_dead_letter_queue_dropped_events
logstash_stats_pipeline_dead_letter_queue_expired_events
logstash_stats_pipeline_dead_letter_queue_max_size_in_bytes
logstash_stats_pipeline_dead_letter_queue_size_in_bytes

0 comments on commit 859ccd6

Please sign in to comment.