Skip to content

Commit

Permalink
Add package handling the metrics. Add example metric for mask plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
Sandu Kiritsa committed Mar 6, 2022
1 parent 228a048 commit 1ec090e
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 32 deletions.
11 changes: 7 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ UPSTREAM_BRANCH ?= origin/master
prepare:
docker login

.PHONY: build
build:
echo "Building..."
GOOS=linux GOARCH=amd64 go build -v -o file.d ./cmd/file.d.go

.PHONY: deps
deps:
go get -v github.com/vitkovskii/insane-doc@v0.0.1
Expand Down Expand Up @@ -51,14 +56,12 @@ profile-file:
go test -bench LightJsonReadPar ./plugin/input/file -v -count 1 -run -benchmem -benchtime 1x -cpuprofile cpu.pprof -memprofile mem.pprof -mutexprofile mutex.pprof

.PHONY: push-version-linux-amd64
push-version-linux-amd64:
GOOS=linux GOARCH=amd64 go build -v -o file.d ./cmd/file.d.go
push-version-linux-amd64: build
docker build -t ozonru/file.d:${VERSION}-linux-amd64 .
docker push ozonru/file.d:${VERSION}-linux-amd64

.PHONY: push-latest-linux-amd64
push-latest-linux-amd64:
GOOS=linux GOARCH=amd64 go build -v -o file.d ./cmd/file.d.go
push-latest-linux-amd64: build
docker build -t ozonru/file.d:latest-linux-amd64 .
docker push ozonru/file.d:latest-linux-amd64

Expand Down
6 changes: 6 additions & 0 deletions fd/file.d.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/ozontech/file.d/logger"
"github.com/ozontech/file.d/longpanic"
"github.com/ozontech/file.d/pipeline"
"github.com/ozontech/file.d/stats"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
Expand Down Expand Up @@ -44,10 +45,15 @@ func (f *FileD) Start() {
logger.Infof("starting file.d")

f.createRegistry()
f.initMetrics()
f.startHTTP()
f.startPipelines()
}

func (f *FileD) initMetrics() {
stats.InitStats()
}

func (f *FileD) createRegistry() {
f.registry = prometheus.NewRegistry()
f.registry.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{}))
Expand Down
46 changes: 18 additions & 28 deletions plugin/action/mask/mask.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (

"github.com/ozontech/file.d/fd"
"github.com/ozontech/file.d/pipeline"
"github.com/prometheus/client_golang/prometheus"
"github.com/ozontech/file.d/stats"
insaneJSON "github.com/vitkovskii/insane-json"
"go.uber.org/zap"
)
Expand All @@ -33,20 +33,17 @@ pipelines:
}*/

const (
substitution = byte('*')

metricName = "mask_plugin"
substitution = byte('*')
metricSubsystem = "mask_plugin"
timesActivated = "times_activated"
)

var MaskPromCounter = prometheus.NewCounter(prometheus.CounterOpts{})

type Plugin struct {
config *Config
sourceBuf []byte
maskBuf []byte
logMaskAppeared bool
valueNodes []*insaneJSON.Node
logger *zap.SugaredLogger
config *Config
sourceBuf []byte
maskBuf []byte
valueNodes []*insaneJSON.Node
logger *zap.SugaredLogger
}

//! config-params
Expand Down Expand Up @@ -148,26 +145,18 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.ActionPluginP
p.valueNodes = make([]*insaneJSON.Node, 0)
p.logger = params.Logger
p.config.Masks = compileMasks(p.config.Masks, p.logger)
if p.config.MetricSubsystemName != nil {
p.logMaskAppeared = true
p.registerPluginMetrics(pipeline.PromNamespace, *p.config.MetricSubsystemName, metricName)
}
p.registerPluginMetrics()
}

func (p *Plugin) Stop() {
}

func (p *Plugin) registerPluginMetrics(namespace, subsystem, metricName string) {
// can't declare counter as property on p.counter, because multiple cores
// will create multiple metrics and all but last one will be unregistered.
MaskPromCounter = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: metricName,
Help: "",
func (p *Plugin) registerPluginMetrics() {
stats.RegisterCounter(&stats.MetricDesc{
Name: timesActivated,
Subsystem: metricSubsystem,
Help: "Number of times mask plugin found the provided pattern",
})
prometheus.DefaultRegisterer.Unregister(MaskPromCounter)
prometheus.DefaultRegisterer.MustRegister(MaskPromCounter)
}

func appendMask(dst, src []byte, begin, end int) ([]byte, int) {
Expand Down Expand Up @@ -255,8 +244,9 @@ func (p *Plugin) Do(event *pipeline.Event) pipeline.ActionResult {
}
v.MutateToString(string(p.maskBuf))
}
if p.logMaskAppeared && maskApplied {
MaskPromCounter.Inc()

if maskApplied {
stats.GetCounter(metricSubsystem, timesActivated).Inc()
p.logger.Infof("mask appeared to event, output string: %s", event.Root.EncodeToString())
}

Expand Down
57 changes: 57 additions & 0 deletions stats/stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package stats

import (
"github.com/ozontech/file.d/pipeline"
prom "github.com/prometheus/client_golang/prometheus"
)

type MetricDesc struct {
Subsystem string
Name string
Help string
}

type key struct {
namespace string
subsystem string
name string
}

type AnyMetric prom.Collector

type stats struct {
allMetrics map[key]AnyMetric
}

var statsGlobal *stats

func InitStats() {
statsGlobal = &stats{}
statsGlobal.allMetrics = make(map[key]AnyMetric)
}

func RegisterCounter(metricDesc *MetricDesc) {
maskPromCounter := prom.NewCounter(prom.CounterOpts{
Namespace: pipeline.PromNamespace,
Subsystem: metricDesc.Subsystem,
Name: metricDesc.Name,
Help: metricDesc.Help,
})

keyInternal := key{pipeline.PromNamespace, metricDesc.Subsystem, metricDesc.Name}
statsGlobal.registerMetric(keyInternal, maskPromCounter)
}

func GetCounter(subsystem, metricName string) prom.Counter {
return statsGlobal.allMetrics[getKey(subsystem, metricName)].(prom.Counter)
}

func getKey(subsystem, metricName string) key {
return key{pipeline.PromNamespace, subsystem, metricName}
}

func (s *stats) registerMetric(k key, metric prom.Collector) {
s.allMetrics[k] = metric
prom.DefaultRegisterer.Unregister(metric)
prom.DefaultRegisterer.MustRegister(metric)
}
2 changes: 2 additions & 0 deletions test/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/ozontech/file.d/pipeline"
"github.com/ozontech/file.d/plugin/input/fake"
"github.com/ozontech/file.d/plugin/output/devnull"
"github.com/ozontech/file.d/stats"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic"
"go.uber.org/zap"
Expand Down Expand Up @@ -94,6 +95,7 @@ func WaitForEvents(x *atomic.Int32) {
}

func NewPipeline(actions []*pipeline.ActionPluginStaticInfo, pipelineOpts ...string) *pipeline.Pipeline {
stats.InitStats()
parallel := Opts(pipelineOpts).Has("parallel")
perf := Opts(pipelineOpts).Has("perf")
mock := Opts(pipelineOpts).Has("mock")
Expand Down

0 comments on commit 1ec090e

Please sign in to comment.