forked from CiscoDevNet/bigmuddy-network-telemetry-pipeline
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmetrics_influx.go
503 lines (440 loc) · 13.1 KB
/
metrics_influx.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
package main
//
// Notes:
//
// The life of the router is to pull its dataMsg channel, and feed it it
// to one of a number of worker queues.
//
// The life of a worker is to pull messages from it's queue, and for each,
// build metrics.
//
// A worker handles produceMetrics callbacks as follows:
// On build metric;
// we simply accumulate metricsAtoms
// On flushMetric, we add take the tags, and the accumulated metrics, and
// build a point.
// On return from produceMetrics we write the batch (and dump to file if
// logging diagnostics).
//
// Protagonists:
// metricsInfluxOutputHandler
// dataChanRouter (one router feeding multiple workers)
// metricsInfluxOutputWorker (multiple)
//
import (
"bufio"
"fmt"
log "github.com/Sirupsen/logrus"
"github.com/influxdata/influxdb/client/v2"
"os"
"runtime"
"time"
)
const (
// Timeout waiting to enqueue message. If queues aren't drained in
// this time (coupled with buf channel to absorb transients), then
// we're toast.
METRICS_INFLUX_TIMEOUT_ENQUEUE_SECONDS = 2
METRICS_INFLUX_WAIT_TO_RECONNECT_SECONDS = 2
//
// An estimate of fields per point allows us to setup
// slices with a capacity which minimise reallocation,
// without over allocating to omuch
METRICS_INFLUX_FIELDS_PER_POINT_ESTIMATE = 32
)
type metricsInfluxOutputHandler struct {
influxServer string
database string
consistency string
retention string
standalone bool
auth userPasswordCollector
//
// Workers are fed through a channel (of dataChannelDepth)
// by the dataMsgRouter. lastWorker is used to track the
// last worker used.
workers int
router *dataMsgRouter
lastWorker int
dataChannelDepth int
//
// metricsfilename allow for diagnostic dump of metrics as
// exported to InfluxDB
metricsfilename string
//
// Logging context, built once, and reused.
logctx *log.Entry
}
//
// metricsInfluxOutputWorker handles (sub)set of events and translates
// them into measurement POSTs to Influxdb, working completely
// autonomously from any other workers if present.
type metricsInfluxOutputWorker struct {
influxServer string
wkid int
influxOutputHandler *metricsInfluxOutputHandler
dataChan chan dataMsg
metricsfilename string
logctx *log.Entry
}
type metricsInfluxOutputContext struct {
name string
fields []*metricsAtom
bp client.BatchPoints
}
func (w *metricsInfluxOutputWorker) worker(m *metricsOutputModule) {
var metricsfile *os.File
var error_tag string
var dump *bufio.Writer
var influxClient client.Client
var metadata *dataMsgMetaData
var err error
var bp client.BatchPoints
w.logctx.Debug("Run worker")
defer m.shutdownSyncPoint.Done()
if w.metricsfilename != "" {
metricsfile, dump = metricsSetupDumpfile(
w.metricsfilename, w.logctx)
if metricsfile != nil {
defer metricsfile.Close()
}
}
outputHandler := w.influxOutputHandler
// Add the client batch configuration once, and reuse it
// for every batch we add.
batchCfg := client.BatchPointsConfig{
Database: outputHandler.database,
Precision: "ms",
RetentionPolicy: outputHandler.retention,
WriteConsistency: outputHandler.consistency,
}
for {
// Any failure, other than explicitly closed channels
// indicating we're shutting down, causes us to go back to go,
// collect £200 and try again.
//
// Add tls config here.
if !outputHandler.standalone {
var user, passw string
user, passw, err = outputHandler.auth.getUP()
if err == nil {
influxClient, err = client.NewHTTPClient(client.HTTPConfig{
Addr: w.influxServer, Username: user, Password: passw,
})
}
}
if err != nil {
// Wait, and come round again
w.logctx.WithError(err).Error("connect to influx node (will retry)")
time.Sleep(
time.Duration(METRICS_INFLUX_WAIT_TO_RECONNECT_SECONDS) * time.Second)
continue
}
if outputHandler.standalone {
w.logctx.Debug("Running in standalone mode (dumping points to file)")
} else {
w.logctx.Debug("Connected to influx node")
}
// Ok, we connected. Lets get into the main loop.
for {
msg, ok := <-w.dataChan
if !ok {
w.logctx.Debug("Closed worker")
if !outputHandler.standalone {
influxClient.Close()
}
return
}
bp, err = client.NewBatchPoints(batchCfg)
if err != nil {
// Break out of the loop and start worker over. We
// failed to get a new batch.
error_tag = "failed to create batch point"
break
}
metadata = msg.getMetaData()
context := &metricsInfluxOutputContext{
name: metadata.Path,
fields: nil,
bp: bp,
}
err = msg.produceMetrics(&m.inputSpec, m.outputHandler, context)
if err != nil {
metricsStatMonitor.basePathMetricsError.WithLabelValues(
m.name, metadata.Identifier, metadata.Path,
"produce failed").Inc()
continue
}
// If no metrics produced - perfectly valid, continue
pts := bp.Points()
if len(pts) == 0 {
continue
}
// Dump metrics if doing diagnostics. Costly, of course.
if dump != nil {
dump.WriteString(fmt.Sprintf(
"Server: [%s], wkid %d, writing %d points in db: %s\n"+
"(prec: [%s], consistency: [%s], retention: [%s])\n",
w.influxServer,
w.wkid,
len(pts),
bp.Database(),
bp.Precision(),
bp.WriteConsistency(),
bp.RetentionPolicy()))
for _, pt := range pts {
//
// Start with a simple dump. Might need to extend a little.
dump.WriteString(fmt.Sprintf("\t%s\n", pt.String()))
}
}
if !outputHandler.standalone {
err = influxClient.Write(context.bp)
}
if err != nil {
error_tag = "failed to write batch point"
break
}
}
//
// We would be here on error.
metricsStatMonitor.basePathMetricsError.WithLabelValues(
m.name, metadata.Identifier, metadata.Path,
error_tag).Inc()
//
// Close existing client.
if !outputHandler.standalone {
influxClient.Close()
}
//
// It might be too noisy to log error here. We may need to
// consider dampening and relying on exported metric
w.logctx.WithError(err).WithFields(log.Fields{
"error_tag": error_tag}).Error(
"exit loop handling messages, will reconnect")
time.Sleep(
time.Duration(METRICS_INFLUX_WAIT_TO_RECONNECT_SECONDS) * time.Second)
}
}
func (o *metricsInfluxOutputHandler) setupWorkers(m *metricsOutputModule) {
//
// Setup as many workers with their own context as necessary. We
// also route to the various workers using a dedicated
// router. This will be generalised.
//
// Assuming we are picking up off the pub/sub bus, we could be
// splitting the load by kafka partition already, and this
// might be one instance of a group of pipelines operating as
// a consumer group.
//
var dumpfilename string
o.logctx.Info("Setting up workers")
o.router = &dataMsgRouter{
dataChanIn: m.dataChan,
shutdownChan: m.shutdownChan,
dataChansOut: make([]chan dataMsg, o.workers),
logctx: o.logctx,
route: func(msg dataMsg, attempts int) int {
//
// We start with simple round robin algorithm.
o.lastWorker++
o.lastWorker %= o.workers
return o.lastWorker
},
handleCongested: func(
msg dataMsg, attempts int, worker int) dataMsgRouterCongestionAction {
metadata := msg.getMetaData()
// Reroute to another worker.
if attempts < o.workers {
metricsStatMonitor.basePathMetricsError.WithLabelValues(
m.name, metadata.Identifier, metadata.Path,
"congested worker (rerouted)").Inc()
return DATAMSG_ROUTER_REROUTE
}
//
// Give up and drop.
metricsStatMonitor.basePathMetricsError.WithLabelValues(
m.name, metadata.Identifier, metadata.Path,
"congested worker (dropped)").Inc()
return DATAMSG_ROUTER_DROP
},
// We do not really use the timeout. Behaviour is currently to
// hunt for worker whcih can take message or drop.
timeout: time.Duration(METRICS_INFLUX_TIMEOUT_ENQUEUE_SECONDS) * time.Second,
}
//
// Inherit channel depth for workers too.
o.dataChannelDepth = m.dataChannelDepth
for i := 0; i < o.workers; i++ {
o.router.dataChansOut[i] = make(chan dataMsg, o.dataChannelDepth)
m.shutdownSyncPoint.Add(1)
if o.metricsfilename != "" {
dumpfilename = fmt.Sprintf(
"%s_wkid%d", o.metricsfilename, i)
} else {
dumpfilename = ""
}
w := &metricsInfluxOutputWorker{
influxServer: o.influxServer,
wkid: i,
influxOutputHandler: o,
dataChan: o.router.dataChansOut[i],
metricsfilename: dumpfilename,
logctx: o.logctx.WithFields(log.Fields{"wkid": i}),
}
go w.worker(m)
}
//
// Kick off router to start collecting and routing messages to
// workers.
go o.router.run()
}
//
// Simply colllect the metric atoms at this stage for Influx. We use
// the flush to assemble a new point, and clear the list.
func (o *metricsInfluxOutputHandler) buildMetric(
tags []metricsAtom, sensor metricsAtom, ts uint64,
context metricsOutputContext) {
c := context.(*metricsInfluxOutputContext)
if c.fields == nil {
c.fields = make([]*metricsAtom, 0, METRICS_INFLUX_FIELDS_PER_POINT_ESTIMATE)
}
//
// Rewrite accursed uint64:
//
// fmt.Printf(" uint64 [%v] -> float64 [%v]\n",
// uint64(math.MaxUint64),
// float64(math.MaxUint64))
//
// uint64 [18446744073709551615] -> float64 [1.8446744073709552e+19]
//
switch sensor.val.(type) {
case uint64:
sensor.val = float64(sensor.val.(uint64))
default:
// nothing to do!
}
c.fields = append(c.fields, &sensor)
}
func (o *metricsInfluxOutputHandler) flushMetric(
tag_atoms []metricsAtom, ts uint64, context metricsOutputContext) {
c := context.(*metricsInfluxOutputContext)
if c.fields != nil {
fields := map[string]interface{}{}
for _, field_atom := range c.fields {
fields[field_atom.key] = field_atom.val
}
tags := map[string]string{}
for _, tag_atom := range tag_atoms {
tags[tag_atom.key] = fmt.Sprintf("%v", tag_atom.val)
}
pt, err := client.NewPoint(c.name, tags, fields,
time.Unix(0, int64(ts)*1000*1000))
if err == nil {
c.bp.AddPoint(pt)
} else {
//
// Unexpected failure.
o.logctx.WithFields(
log.Fields{"base_path": c.name}).WithError(err).Error(
"adding point to batch")
}
//
// Finish by clearing the c.fields
c.fields = nil
}
}
func (o *metricsInfluxOutputHandler) adaptSensorName(name string) string {
return name
}
func (o *metricsInfluxOutputHandler) adaptTagName(name string) string {
return name
}
//
// Process the configuration to extract whatever is needed.
func metricsInfluxNew(name string, nc nodeConfig) (metricsOutputHandler, error) {
var err error
var authCollect userPasswordCollector
var standalone bool
logctx := logger.WithFields(log.Fields{
"name": name,
"xport_type": "influx",
})
// If not set, will default to false
metricsfilename, _ := nc.config.GetString(name, "dump")
influxServer, _ := nc.config.GetString(name, "influx")
if influxServer == "" {
if metricsfilename == "" {
err = fmt.Errorf(
"attribute 'influx' required for influx metric export. " +
"Specify URL of the form " +
"http://[ipv6-host%%zone]:port or " +
"http://influx.example.com:port. " +
"Alternatively specify 'dump' option to just log points.")
logctx.WithError(err).Error("insufficient configuration")
return nil, err
} else {
standalone = true
}
} else {
logctx = logctx.WithFields(log.Fields{"influx": influxServer})
}
//
// TODO: Add TLS support by pulling in TLS config at this point.
database, err := nc.config.GetString(name, "database")
if err != nil {
logctx.WithError(err).Error(
"attribute 'database' required for influx metric export. " +
" For a database created with 'CREATE DATABASE <name>', " +
"this setting would be 'database=<name>'")
return nil, err
}
logctx = logctx.WithFields(log.Fields{"database": database})
// Handle user/password
if !standalone {
authCollect = influxUPCollectorFactory()
err = authCollect.handleConfig(nc, name, influxServer)
if err != nil {
logctx.WithError(err).Error(
"failed to collect credentials required for influx")
return nil, err
}
}
//
// One could imagine templatising these in exactly the same way as
// we templatise deriving topic in kafka. Future.
consistency, _ := nc.config.GetString(name, "consistency")
retention, _ := nc.config.GetString(name, "retention")
workers, _ := nc.config.GetInt(name, "workers")
if workers == 0 {
workers = 1
} else if workers > runtime.GOMAXPROCS(-1) {
//
// Excessive number of workers... cut back
workers = runtime.GOMAXPROCS(-1)
}
logctx = logctx.WithFields(log.Fields{"workers": workers})
//
// Populate the influx output context
out := &metricsInfluxOutputHandler{
influxServer: influxServer,
auth: authCollect,
database: database,
consistency: consistency,
retention: retention,
workers: workers,
standalone: standalone,
logctx: logctx,
metricsfilename: metricsfilename,
}
return out, nil
}
var influxUPCollectorFactory userPasswordCollectorFactory
//
// We use init to setup the user/password collection function. This
// can be overwritten by test.
func init() {
influxUPCollectorFactory = func() userPasswordCollector {
return &cryptUserPasswordCollector{}
}
}