Skip to content

Commit

Permalink
Pipelines collector refactor (#258)
Browse files Browse the repository at this point in the history
* Fix indentation

* Add metrics helper to create metrics

* Test metrics helper

* Fix tests for metrics helper

* Test for different metric value types

* Refactor nodestats and add methods to metrics helper

* Refactor nodeinfo collector and correct exporter config tests

* Add timestamp metric creation to metric helper

* Merge with master

* Test timestamp metric

* Refactor pipeline subcollector and fix letter-case in timestamp extract function

* Remove duplicate in SECURITY.md

* Improve readability by expanding variable names

---------

Co-authored-by: Jakub <kubasurdej@gmail.com>
  • Loading branch information
satk0 and kuskoman authored Feb 11, 2024
1 parent f980356 commit e68f45d
Show file tree
Hide file tree
Showing 6 changed files with 383 additions and 283 deletions.
2 changes: 0 additions & 2 deletions SECURITY.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@

## Reporting a Vulnerability

## Reporting a Vulnerability

If you believe you have found a security vulnerability in any version of our project, we encourage you to let us know as soon as possible. We will investigate all legitimate reports and do our best to quickly fix the problem.

Please report vulnerabilities by sending an email to [kubasurdej@gmail.com](mailto:kubasurdej@gmail.com). Include the following information in your report:
Expand Down
58 changes: 29 additions & 29 deletions internal/collectors/nodeinfo/nodeinfo_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import (

"github.com/prometheus/client_golang/prometheus"

"github.com/kuskoman/logstash-exporter/internal/prometheus_helper"
"github.com/kuskoman/logstash-exporter/internal/fetcher/logstash_client"
"github.com/kuskoman/logstash-exporter/internal/fetcher/responses"
"github.com/kuskoman/logstash-exporter/internal/prometheus_helper"
"github.com/kuskoman/logstash-exporter/pkg/config"
)

Expand All @@ -25,42 +25,42 @@ var (
type NodeinfoCollector struct {
clients []logstash_client.Client

NodeInfos *prometheus.Desc
BuildInfos *prometheus.Desc
NodeInfos *prometheus.Desc
BuildInfos *prometheus.Desc

Up *prometheus.Desc
Up *prometheus.Desc

PipelineWorkers *prometheus.Desc
PipelineBatchSize *prometheus.Desc
PipelineBatchDelay *prometheus.Desc

Status *prometheus.Desc
Status *prometheus.Desc
}

func NewNodeinfoCollector(clients []logstash_client.Client) *NodeinfoCollector {
descHelper := prometheus_helper.SimpleDescHelper{Namespace: namespace, Subsystem: subsystem}

return &NodeinfoCollector{
clients: clients,
NodeInfos: descHelper.NewDesc("node",
clients: clients,
NodeInfos: descHelper.NewDesc("node",
"A metric with a constant '1' value labeled by node name, version, host, http_address, and id of the logstash instance.",
"name", "version", "http_address", "host", "id",
),
BuildInfos: descHelper.NewDesc("build",
BuildInfos: descHelper.NewDesc("build",
"A metric with a constant '1' value labeled by build date, sha, and snapshot of the logstash instance.",
"date", "sha", "snapshot"),

Up: descHelper.NewDesc("up",
Up: descHelper.NewDesc("up",
"A metric that returns 1 if the node is up, 0 otherwise."),
PipelineWorkers: descHelper.NewDesc("pipeline_workers",
PipelineWorkers: descHelper.NewDesc("pipeline_workers",
"Number of worker threads that will process pipeline events.",
),
PipelineBatchSize: descHelper.NewDesc("pipeline_batch_size",
PipelineBatchSize: descHelper.NewDesc("pipeline_batch_size",
"Number of events to retrieve from the input queue before sending to the filter and output stages."),
PipelineBatchDelay: descHelper.NewDesc("pipeline_batch_delay",
"Amount of time to wait for events to fill the batch before sending to the filter and output stages."),

Status: descHelper.NewDesc("status",
Status: descHelper.NewDesc("status",
"A metric with a constant '1' value labeled by status.",
"status"),
}
Expand Down Expand Up @@ -101,52 +101,52 @@ func (c *NodeinfoCollector) Collect(ctx context.Context, ch chan<- prometheus.Me
return errors.New(errorString)
}

func (c *NodeinfoCollector) collectSingleInstance(client logstash_client.Client, ctx context.Context, ch chan<- prometheus.Metric) error {
func (collector *NodeinfoCollector) collectSingleInstance(client logstash_client.Client, ctx context.Context, ch chan<- prometheus.Metric) error {
endpoint := client.GetEndpoint()
mh := prometheus_helper.SimpleMetricsHelper{Channel: ch, Labels: []string{endpoint}}
metricsHelper := prometheus_helper.SimpleMetricsHelper{Channel: ch, Labels: []string{endpoint}}

nodeInfo, err := client.GetNodeInfo(ctx)
if err != nil {
status := c.getUpStatus(nodeInfo, err)
status := collector.getUpStatus(nodeInfo, err)

// ***** UP *****
mh.NewIntMetric(c.Up, prometheus.GaugeValue, status)
metricsHelper.NewIntMetric(collector.Up, prometheus.GaugeValue, status)
// **************

return err
}

// ***** NODE *****
mh.Labels = []string{nodeInfo.Name, nodeInfo.Version, nodeInfo.Host, nodeInfo.HTTPAddress, nodeInfo.ID, endpoint}
mh.NewIntMetric(c.NodeInfos, prometheus.CounterValue, 1)
metricsHelper.Labels = []string{nodeInfo.Name, nodeInfo.Version, nodeInfo.Host, nodeInfo.HTTPAddress, nodeInfo.ID, endpoint}
metricsHelper.NewIntMetric(collector.NodeInfos, prometheus.CounterValue, 1)
// ****************

// ***** BUILD *****
mh.Labels = []string{nodeInfo.BuildDate, nodeInfo.BuildSHA, strconv.FormatBool(nodeInfo.BuildSnapshot), endpoint}
mh.NewIntMetric(c.BuildInfos, prometheus.CounterValue, 1)
metricsHelper.Labels = []string{nodeInfo.BuildDate, nodeInfo.BuildSHA, strconv.FormatBool(nodeInfo.BuildSnapshot), endpoint}
metricsHelper.NewIntMetric(collector.BuildInfos, prometheus.CounterValue, 1)
// *****************

mh.Labels = []string{endpoint}
metricsHelper.Labels = []string{endpoint}

// ***** UP *****
mh.NewIntMetric(c.Up, prometheus.GaugeValue, 1)
metricsHelper.NewIntMetric(collector.Up, prometheus.GaugeValue, 1)
// **************

// ***** PIPELINE *****
mh.NewIntMetric(c.PipelineWorkers, prometheus.CounterValue, nodeInfo.Pipeline.Workers)
mh.NewIntMetric(c.PipelineBatchSize, prometheus.CounterValue, nodeInfo.Pipeline.BatchSize)
mh.NewIntMetric( c.PipelineBatchDelay, prometheus.CounterValue, nodeInfo.Pipeline.BatchDelay)
metricsHelper.NewIntMetric(collector.PipelineWorkers, prometheus.CounterValue, nodeInfo.Pipeline.Workers)
metricsHelper.NewIntMetric(collector.PipelineBatchSize, prometheus.CounterValue, nodeInfo.Pipeline.BatchSize)
metricsHelper.NewIntMetric(collector.PipelineBatchDelay, prometheus.CounterValue, nodeInfo.Pipeline.BatchDelay)
// ********************

// ***** STATUS *****
mh.Labels = []string{nodeInfo.Status, endpoint}
mh.NewIntMetric(c.Status, prometheus.CounterValue, 1)
// ******************
metricsHelper.Labels = []string{nodeInfo.Status, endpoint}
metricsHelper.NewIntMetric(collector.Status, prometheus.CounterValue, 1)
// ******************

return nil
}

func (c *NodeinfoCollector) getUpStatus(nodeinfo *responses.NodeInfoResponse, err error) int{
func (c *NodeinfoCollector) getUpStatus(nodeinfo *responses.NodeInfoResponse, err error) int {
status := 1
if err != nil {
status = 0
Expand Down
Loading

0 comments on commit e68f45d

Please sign in to comment.