Skip to content

Commit

Permalink
Refactor metric declaration logic (kuskoman#241)
Browse files Browse the repository at this point in the history
* Fix indentation

* Add metrics helper to create metrics

* Test metrics helper

* Fix tests for metrics helper

* Test for different metric value types

* Refactor nodestats and add methods to metrics helper

* Refactor nodeinfo collector and correct exporter config tests
  • Loading branch information
satk0 authored Feb 5, 2024
1 parent 984c1ee commit 85383c9
Show file tree
Hide file tree
Showing 9 changed files with 395 additions and 322 deletions.
170 changes: 61 additions & 109 deletions internal/collectors/nodeinfo/nodeinfo_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,77 +9,60 @@ import (

"github.com/prometheus/client_golang/prometheus"

"github.com/kuskoman/logstash-exporter/internal/prometheus_helper"
"github.com/kuskoman/logstash-exporter/internal/fetcher/logstash_client"
"github.com/kuskoman/logstash-exporter/internal/fetcher/responses"
"github.com/kuskoman/logstash-exporter/pkg/config"
)

const subsystem = "info"

var (
namespace = config.PrometheusNamespace
)

// NodeinfoCollector is a custom collector for the /_node/stats endpoint
type NodeinfoCollector struct {
clients []logstash_client.Client

NodeInfos *prometheus.Desc
BuildInfos *prometheus.Desc
NodeInfos *prometheus.Desc
BuildInfos *prometheus.Desc

Up *prometheus.Desc
Up *prometheus.Desc

PipelineWorkers *prometheus.Desc
PipelineBatchSize *prometheus.Desc
PipelineBatchDelay *prometheus.Desc

Status *prometheus.Desc
Status *prometheus.Desc
}

func NewNodeinfoCollector(clients []logstash_client.Client) *NodeinfoCollector {
const subsystem = "info"
namespace := config.PrometheusNamespace
descHelper := prometheus_helper.SimpleDescHelper{Namespace: namespace, Subsystem: subsystem}

return &NodeinfoCollector{
clients: clients,
NodeInfos: prometheus.NewDesc(
prometheus.BuildFQName(namespace, subsystem, "node"),
clients: clients,
NodeInfos: descHelper.NewDesc("node",
"A metric with a constant '1' value labeled by node name, version, host, http_address, and id of the logstash instance.",
[]string{"name", "version", "http_address", "host", "id", "hostname"},
nil,
"name", "version", "http_address", "host", "id",
),
BuildInfos: prometheus.NewDesc(
prometheus.BuildFQName(namespace, subsystem, "build"),
BuildInfos: descHelper.NewDesc("build",
"A metric with a constant '1' value labeled by build date, sha, and snapshot of the logstash instance.",
[]string{"date", "sha", "snapshot", "hostname"},
nil,
),
"date", "sha", "snapshot"),

Up: prometheus.NewDesc(
prometheus.BuildFQName(namespace, subsystem, "up"),
"A metric that returns 1 if the node is up, 0 otherwise.",
[]string{"hostname"},
nil,
),
PipelineWorkers: prometheus.NewDesc(
prometheus.BuildFQName(namespace, subsystem, "pipeline_workers"),
Up: descHelper.NewDesc("up",
"A metric that returns 1 if the node is up, 0 otherwise."),
PipelineWorkers: descHelper.NewDesc("pipeline_workers",
"Number of worker threads that will process pipeline events.",
[]string{"hostname"},
nil,
),
PipelineBatchSize: prometheus.NewDesc(
prometheus.BuildFQName(namespace, subsystem, "pipeline_batch_size"),
"Number of events to retrieve from the input queue before sending to the filter and output stages.",
[]string{"hostname"},
nil,
),
PipelineBatchDelay: prometheus.NewDesc(
prometheus.BuildFQName(namespace, subsystem, "pipeline_batch_delay"),
"Amount of time to wait for events to fill the batch before sending to the filter and output stages.",
[]string{"hostname"},
nil,
),
PipelineBatchSize: descHelper.NewDesc("pipeline_batch_size",
"Number of events to retrieve from the input queue before sending to the filter and output stages."),
PipelineBatchDelay: descHelper.NewDesc("pipeline_batch_delay",
"Amount of time to wait for events to fill the batch before sending to the filter and output stages."),

Status: prometheus.NewDesc(
prometheus.BuildFQName(namespace, subsystem, "status"),
Status: descHelper.NewDesc("status",
"A metric with a constant '1' value labeled by status.",
[]string{"status", "hostname"},
nil,
),
"status"),
}
}

Expand Down Expand Up @@ -119,88 +102,57 @@ func (c *NodeinfoCollector) Collect(ctx context.Context, ch chan<- prometheus.Me
}

func (c *NodeinfoCollector) collectSingleInstance(client logstash_client.Client, ctx context.Context, ch chan<- prometheus.Metric) error {
endpoint := client.GetEndpoint()
mh := prometheus_helper.SimpleMetricsHelper{Channel: ch, Labels: []string{endpoint}}

nodeInfo, err := client.GetNodeInfo(ctx)
if err != nil {
ch <- c.getUpStatus(nodeInfo, err, client.GetEndpoint())
status := c.getUpStatus(nodeInfo, err)

// ***** UP *****
mh.NewIntMetric(c.Up, prometheus.GaugeValue, status)
// **************

return err
}

endpoint := client.GetEndpoint()
// ***** NODE *****
mh.Labels = []string{nodeInfo.Name, nodeInfo.Version, nodeInfo.Host, nodeInfo.HTTPAddress, nodeInfo.ID, endpoint}
mh.NewIntMetric(c.NodeInfos, prometheus.CounterValue, 1)
// ****************

// ***** BUILD *****
mh.Labels = []string{nodeInfo.BuildDate, nodeInfo.BuildSHA, strconv.FormatBool(nodeInfo.BuildSnapshot), endpoint}
mh.NewIntMetric(c.BuildInfos, prometheus.CounterValue, 1)
// *****************

mh.Labels = []string{endpoint}

// ***** UP *****
mh.NewIntMetric(c.Up, prometheus.GaugeValue, 1)
// **************

// ***** PIPELINE *****
mh.NewIntMetric(c.PipelineWorkers, prometheus.CounterValue, nodeInfo.Pipeline.Workers)
mh.NewIntMetric(c.PipelineBatchSize, prometheus.CounterValue, nodeInfo.Pipeline.BatchSize)
mh.NewIntMetric( c.PipelineBatchDelay, prometheus.CounterValue, nodeInfo.Pipeline.BatchDelay)
// ********************

ch <- prometheus.MustNewConstMetric(
c.NodeInfos,
prometheus.CounterValue,
float64(1),
nodeInfo.Name,
nodeInfo.Version,
nodeInfo.Host,
nodeInfo.HTTPAddress,
nodeInfo.ID,
endpoint,
)

ch <- prometheus.MustNewConstMetric(
c.BuildInfos,
prometheus.CounterValue,
float64(1),
nodeInfo.BuildDate,
nodeInfo.BuildSHA,
strconv.FormatBool(nodeInfo.BuildSnapshot),
endpoint,
)

ch <- prometheus.MustNewConstMetric(
c.Up,
prometheus.GaugeValue,
float64(1),
endpoint,
)

ch <- prometheus.MustNewConstMetric(
c.PipelineWorkers,
prometheus.CounterValue,
float64(nodeInfo.Pipeline.Workers),
endpoint,
)

ch <- prometheus.MustNewConstMetric(
c.PipelineBatchSize,
prometheus.CounterValue,
float64(nodeInfo.Pipeline.BatchSize),
endpoint,
)

ch <- prometheus.MustNewConstMetric(
c.PipelineBatchDelay,
prometheus.CounterValue,
float64(nodeInfo.Pipeline.BatchDelay),
endpoint,
)

ch <- prometheus.MustNewConstMetric(
c.Status,
prometheus.CounterValue,
float64(1),
nodeInfo.Status,
endpoint,
)
// ***** STATUS *****
mh.Labels = []string{nodeInfo.Status, endpoint}
mh.NewIntMetric(c.Status, prometheus.CounterValue, 1)
// ******************

return nil
}

func (c *NodeinfoCollector) getUpStatus(nodeinfo *responses.NodeInfoResponse, err error, endpoint string) prometheus.Metric {
func (c *NodeinfoCollector) getUpStatus(nodeinfo *responses.NodeInfoResponse, err error) int{
status := 1
if err != nil {
status = 0
} else if nodeinfo.Status != "green" && nodeinfo.Status != "yellow" {
status = 0
}

return prometheus.MustNewConstMetric(
c.Up,
prometheus.GaugeValue,
float64(status),
endpoint,
)
return status
}
24 changes: 6 additions & 18 deletions internal/collectors/nodeinfo/nodeinfo_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"os"
"slices"
"testing"
"time"

Expand Down Expand Up @@ -94,15 +95,7 @@ func TestCollectNotNil(t *testing.T) {
}

for _, expectedMetric := range expectedMetrics {
found := false
for _, foundMetric := range foundMetrics {
if foundMetric == expectedMetric {
found = true
break
}
}

if !found {
if !slices.Contains(foundMetrics, expectedMetric) {
t.Errorf("Expected metric %s to be found", expectedMetric)
}
}
Expand Down Expand Up @@ -170,7 +163,7 @@ func TestGetUpStatus(t *testing.T) {
name string
nodeInfo *responses.NodeInfoResponse
err error
expected float64
expected int
}{
{
name: "nil error and green status",
Expand Down Expand Up @@ -200,15 +193,10 @@ func TestGetUpStatus(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
metric := collector.getUpStatus(test.nodeInfo, test.err, "test_endpoint")
metricValue, err := prometheus_helper.ExtractValueFromMetric(metric)

if err != nil {
t.Errorf("Expected no error, got %v", err)
}
status := collector.getUpStatus(test.nodeInfo, test.err)

if metricValue != test.expected {
t.Errorf("Expected metric value to be %v, got %v", test.expected, metricValue)
if status != test.expected {
t.Errorf("Expected up value to be %v, got %v", test.expected, status)
}
})
}
Expand Down
Loading

0 comments on commit 85383c9

Please sign in to comment.