forked from kuskoman/logstash-exporter
-
Notifications
You must be signed in to change notification settings - Fork 0
/
collector_manager.go
99 lines (80 loc) · 3.11 KB
/
collector_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package collectors
import (
"context"
"sync"
"time"
"golang.org/x/exp/slog"
"github.com/kuskoman/logstash-exporter/collectors/nodeinfo"
"github.com/kuskoman/logstash-exporter/collectors/nodestats"
"github.com/kuskoman/logstash-exporter/config"
logstashclient "github.com/kuskoman/logstash-exporter/fetcher/logstash_client"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/version"
)
type Collector interface {
Collect(context.Context, chan<- prometheus.Metric) (err error)
}
// CollectorManager is a collector that executes all other collectors
type CollectorManager struct {
collectors map[string]Collector
scrapeDurations *prometheus.SummaryVec
}
func NewCollectorManager(endpoint string) *CollectorManager {
client := logstashclient.NewClient(endpoint)
collectors := getCollectors(client)
scrapeDurations := getScrapeDurationsCollector()
prometheus.MustRegister(version.NewCollector("logstash_exporter"))
return &CollectorManager{collectors: collectors, scrapeDurations: scrapeDurations}
}
func getCollectors(client logstashclient.Client) map[string]Collector {
collectors := make(map[string]Collector)
collectors["nodeinfo"] = nodeinfo.NewNodeinfoCollector(client)
collectors["nodestats"] = nodestats.NewNodestatsCollector(client)
return collectors
}
// Collect executes all collectors and sends the collected metrics to the provided channel.
// It also sends the duration of the collection to the scrapeDurations collector.
func (manager *CollectorManager) Collect(ch chan<- prometheus.Metric) {
ctx, cancel := context.WithTimeout(context.Background(), config.HttpTimeout)
defer cancel()
waitGroup := sync.WaitGroup{}
waitGroup.Add(len(manager.collectors))
for name, collector := range manager.collectors {
go func(name string, collector Collector) {
slog.Debug("executing collector", "name", name)
manager.executeCollector(name, ctx, collector, ch)
slog.Debug("collector finished", "name", name)
waitGroup.Done()
}(name, collector)
}
waitGroup.Wait()
}
func (manager *CollectorManager) Describe(ch chan<- *prometheus.Desc) {
manager.scrapeDurations.Describe(ch)
}
func (manager *CollectorManager) executeCollector(name string, ctx context.Context, collector Collector, ch chan<- prometheus.Metric) {
executionStart := time.Now()
err := collector.Collect(ctx, ch)
executionDuration := time.Since(executionStart)
var executionStatus string
if err != nil {
slog.Error("executor failed", "name", name, "duration", executionDuration, "err", err)
executionStatus = "error"
} else {
slog.Debug("executor succeeded", "name", name, "duration", executionDuration)
executionStatus = "success"
}
manager.scrapeDurations.WithLabelValues(name, executionStatus).Observe(executionDuration.Seconds())
}
func getScrapeDurationsCollector() *prometheus.SummaryVec {
scrapeDurations := prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Namespace: config.PrometheusNamespace,
Subsystem: "exporter",
Name: "scrape_duration_seconds",
Help: "logstash_exporter: Duration of a scrape job.",
},
[]string{"collector", "result"},
)
return scrapeDurations
}