Skip to content

Commit

Permalink
Implement pipeline subcollector
Browse files Browse the repository at this point in the history
  • Loading branch information
kuskoman committed Feb 14, 2023
1 parent acd6046 commit 75a37de
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 376 deletions.
28 changes: 23 additions & 5 deletions collectors/nodestats/nodestats_collector.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,24 @@
package nodestats

import (
"log"

"github.com/kuskoman/logstash-exporter/config"
logstashclient "github.com/kuskoman/logstash-exporter/fetcher/logstash_client"
"github.com/kuskoman/logstash-exporter/helpers"
"github.com/prometheus/client_golang/prometheus"
)

const subsystem = "stats"

var (
namespace = config.PrometheusNamespace
descHelper = helpers.SimpleDescHelper{Namespace: namespace, Subsystem: subsystem}
)

type NodestatsCollector struct {
client logstashclient.Client
client logstashclient.Client
pipelineSubcollector *PipelineSubcollector

JvmThreadsCount *prometheus.Desc
JvmThreadsPeakCount *prometheus.Desc
Expand All @@ -34,13 +44,12 @@ type NodestatsCollector struct {
}

func NewNodestatsCollector(client logstashclient.Client) *NodestatsCollector {
const subsystem = "stats"
namespace := config.PrometheusNamespace
descHelper := helpers.SimpleDescHelper{Namespace: namespace, Subsystem: subsystem}

return &NodestatsCollector{
client: client,

pipelineSubcollector: NewPipelineSubcollector(),

JvmThreadsCount: descHelper.NewDesc("jvm_threads_count"),
JvmThreadsPeakCount: descHelper.NewDesc("jvm_threads_peak_count"),

Expand Down Expand Up @@ -93,5 +102,14 @@ func (c *NodestatsCollector) Collect(ch chan<- prometheus.Metric) error {

ch <- prometheus.MustNewConstMetric(c.QueueEventsCount, prometheus.GaugeValue, float64(nodeStats.Queue.EventsCount))

return nil
for pipelineId, pipelineStats := range nodeStats.Pipelines {
err = c.pipelineSubcollector.Collect(&pipelineStats, pipelineId, ch)
if err != nil {
log.Printf("Error collecting pipeline %s, stats: %s", pipelineId, err.Error())
}
// we don't want to stop collecting other pipelines if one of them fails
}

// last error is returned
return err
}
67 changes: 67 additions & 0 deletions collectors/nodestats/pipeline_subcollector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package nodestats

import (
"log"
"time"

"github.com/kuskoman/logstash-exporter/fetcher/responses"
"github.com/prometheus/client_golang/prometheus"
)

type PipelineSubcollector struct {
EventsOut *prometheus.Desc
EventsFiltered *prometheus.Desc
EventsIn *prometheus.Desc
EventsDuration *prometheus.Desc
EventsQueuePushDuration *prometheus.Desc

ReloadsLastSuccessTimestamp *prometheus.Desc
ReloadsLastFailureTimestamp *prometheus.Desc
ReloadsSuccesses *prometheus.Desc
ReloadsFailures *prometheus.Desc

QueueEventsCount *prometheus.Desc
QueueEventsQueueSize *prometheus.Desc
QueueMaxQueueSizeInBytes *prometheus.Desc
}

func NewPipelineSubcollector() *PipelineSubcollector {
return &PipelineSubcollector{
EventsOut: descHelper.NewDescWithLabels("pipeline_events_out", []string{"pipeline_id"}),
EventsFiltered: descHelper.NewDescWithLabels("pipeline_events_filtered", []string{"pipeline_id"}),
EventsIn: descHelper.NewDescWithLabels("pipeline_events_in", []string{"pipeline_id"}),
EventsDuration: descHelper.NewDescWithLabels("pipeline_events_duration", []string{"pipeline_id"}),
EventsQueuePushDuration: descHelper.NewDescWithLabels("pipeline_events_queue_push_duration", []string{"pipeline_id"}),

ReloadsSuccesses: descHelper.NewDescWithLabels("pipeline_reloads_successes", []string{"pipeline_id"}),
ReloadsFailures: descHelper.NewDescWithLabels("pipeline_reloads_failures", []string{"pipeline_id"}),

QueueEventsCount: descHelper.NewDescWithLabels("pipeline_queue_events_count", []string{"pipeline_id"}),
QueueEventsQueueSize: descHelper.NewDescWithLabels("pipeline_queue_events_queue_size", []string{"pipeline_id"}),
QueueMaxQueueSizeInBytes: descHelper.NewDescWithLabels("pipeline_queue_max_size_in_bytes", []string{"pipeline_id"}),
}
}

func (collector *PipelineSubcollector) Collect(pipeStats *responses.SinglePipelineResponse, pipelineID string, ch chan<- prometheus.Metric) error {
collectingStart := time.Now()
log.Printf("collecting pipeline stats for pipeline %s", pipelineID)

ch <- prometheus.MustNewConstMetric(collector.EventsOut, prometheus.CounterValue, float64(pipeStats.Events.Out), pipelineID)
ch <- prometheus.MustNewConstMetric(collector.EventsFiltered, prometheus.CounterValue, float64(pipeStats.Events.Filtered), pipelineID)
ch <- prometheus.MustNewConstMetric(collector.EventsIn, prometheus.CounterValue, float64(pipeStats.Events.In), pipelineID)
ch <- prometheus.MustNewConstMetric(collector.EventsDuration, prometheus.CounterValue, float64(pipeStats.Events.DurationInMillis), pipelineID)
ch <- prometheus.MustNewConstMetric(collector.EventsQueuePushDuration, prometheus.CounterValue, float64(pipeStats.Events.QueuePushDurationInMillis), pipelineID)

// todo: add restart timestamps

ch <- prometheus.MustNewConstMetric(collector.ReloadsSuccesses, prometheus.CounterValue, float64(pipeStats.Reloads.Successes), pipelineID)
ch <- prometheus.MustNewConstMetric(collector.ReloadsFailures, prometheus.CounterValue, float64(pipeStats.Reloads.Failures), pipelineID)

ch <- prometheus.MustNewConstMetric(collector.QueueEventsCount, prometheus.CounterValue, float64(pipeStats.Queue.EventsCount), pipelineID)
ch <- prometheus.MustNewConstMetric(collector.QueueEventsQueueSize, prometheus.CounterValue, float64(pipeStats.Queue.QueueSizeInBytes), pipelineID)
ch <- prometheus.MustNewConstMetric(collector.QueueMaxQueueSizeInBytes, prometheus.CounterValue, float64(pipeStats.Queue.MaxQueueSizeInBytes), pipelineID)

collectingEnd := time.Now()
log.Printf("collected pipeline stats for pipeline %s in %s", pipelineID, collectingEnd.Sub(collectingStart))
return nil
}
35 changes: 16 additions & 19 deletions fetcher/responses/nodestats_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,6 @@ type SinglePipelineResponse struct {
EphemeralID string `json:"ephemeral_id"`
}

type PipelinesResponse struct {
Pipelines map[string]SinglePipelineResponse `json:"pipelines"`
}

type PipelineLogstashMonitoringResponse struct {
Events struct {
Out int `json:"out"`
Expand Down Expand Up @@ -255,19 +251,20 @@ type QueueResponse struct {
}

type NodestatsResponse struct {
Host string `json:"host"`
Version string `json:"version"`
HttpAddress string `json:"http_address"`
Id string `json:"id"`
Name string `json:"name"`
EphemeralId string `json:"ephemeral_id"`
Status string `json:"status"`
Snapshot bool `json:"snapshot"`
Pipeline PipelineResponse `json:"pipeline"`
Jvm JvmResponse `json:"jvm"`
Process ProcessResponse `json:"process"`
Pipelines PipelinesResponse `json:"pipelines"`
Reloads ReloadResponse `json:"reloads"`
Os OsResponse `json:"os"`
Queue QueueResponse `json:"queue"`
Host string `json:"host"`
Version string `json:"version"`
HttpAddress string `json:"http_address"`
Id string `json:"id"`
Name string `json:"name"`
EphemeralId string `json:"ephemeral_id"`
Status string `json:"status"`
Snapshot bool `json:"snapshot"`
Pipeline PipelineResponse `json:"pipeline"`
Jvm JvmResponse `json:"jvm"`
Process ProcessResponse `json:"process"`
Reloads ReloadResponse `json:"reloads"`
Os OsResponse `json:"os"`
Queue QueueResponse `json:"queue"`

Pipelines map[string]SinglePipelineResponse `json:"pipelines"`
}
5 changes: 5 additions & 0 deletions helpers/prometheus_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,8 @@ func (h *SimpleDescHelper) NewDesc(name string) *prometheus.Desc {
help := name
return prometheus.NewDesc(prometheus.BuildFQName(h.Namespace, h.Subsystem, name), help, nil, nil)
}

func (h *SimpleDescHelper) NewDescWithLabels(name string, labels []string) *prometheus.Desc {
help := name
return prometheus.NewDesc(prometheus.BuildFQName(h.Namespace, h.Subsystem, name), help, labels, nil)
}
Loading

0 comments on commit 75a37de

Please sign in to comment.