Skip to content

Commit

Permalink
merge master
Browse files Browse the repository at this point in the history
  • Loading branch information
ansakharov committed Mar 10, 2022
2 parents 0dd4ee3 + 2cac467 commit 83a0cf1
Show file tree
Hide file tree
Showing 36 changed files with 555 additions and 107 deletions.
27 changes: 25 additions & 2 deletions .goreleaser.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,41 @@ env:
- CGO_ENABLED=0
checksum:
name_template: '{{ .ProjectName }}_checksums.txt'
changelog:
skip: true
release:
draft: true
prerelease: auto

archives:
- replacements:
darwin: Darwin
linux: Linux
amd64: x86_64

builds:
- main: ./cmd
goos:
- linux
- darwin
goarch:
- amd64
- arm64
mod_timestamp: '{{ .CommitTimestamp }}'
flags:
- -trimpath
nfpms:
-
maintainer: Vitkovskiy Vladimir <vvitkovskiy@ozon.ru>
formats:
- deb
description: |-
file.d is a tool for building data pipelines:
read, process and output events.
homepage: https://ozontech.github.io/file.d/
vendor: Ozon Tech
bindir: /usr/bin
contents:
- src: ./releaser/config.yaml
dst: /etc/file.d/config.yaml
type: config
- src: ./releaser/file.d.service
dst: /etc/systemd/system/file.d.service
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ WORKDIR /file.d

COPY ./file.d .

CMD ./file.d
CMD [ "./file.d" ]
11 changes: 7 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ UPSTREAM_BRANCH ?= origin/master
prepare:
docker login

.PHONY: build
build:
echo "Building..."
GOOS=linux GOARCH=amd64 go build -v -o file.d ./cmd/file.d.go

.PHONY: deps
deps:
go get -v github.com/vitkovskii/insane-doc@v0.0.1
Expand Down Expand Up @@ -51,14 +56,12 @@ profile-file:
go test -bench LightJsonReadPar ./plugin/input/file -v -count 1 -run -benchmem -benchtime 1x -cpuprofile cpu.pprof -memprofile mem.pprof -mutexprofile mutex.pprof

.PHONY: push-version-linux-amd64
push-version-linux-amd64:
GOOS=linux GOARCH=amd64 go build -v -o file.d ./cmd/file.d.go
push-version-linux-amd64: build
docker build -t ozonru/file.d:${VERSION}-linux-amd64 .
docker push ozonru/file.d:${VERSION}-linux-amd64

.PHONY: push-latest-linux-amd64
push-latest-linux-amd64:
GOOS=linux GOARCH=amd64 go build -v -o file.d ./cmd/file.d.go
push-latest-linux-amd64: build
docker build -t ozonru/file.d:latest-linux-amd64 .
docker push ozonru/file.d:latest-linux-amd64

Expand Down
6 changes: 6 additions & 0 deletions fd/file.d.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/ozontech/file.d/logger"
"github.com/ozontech/file.d/longpanic"
"github.com/ozontech/file.d/pipeline"
"github.com/ozontech/file.d/stats"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
Expand Down Expand Up @@ -44,10 +45,15 @@ func (f *FileD) Start() {
logger.Infof("starting file.d")

f.createRegistry()
f.initMetrics()
f.startHTTP()
f.startPipelines()
}

func (f *FileD) initMetrics() {
stats.InitStats()
}

func (f *FileD) createRegistry() {
f.registry = prometheus.NewRegistry()
f.registry.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{}))
Expand Down
17 changes: 3 additions & 14 deletions offset/offset.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,7 @@ func (o *Offset) Load() error {
}
defer file.Close()

if err := o.Callback.Load(file); err != nil {
return err
}
return nil
return o.Callback.Load(file)
}

func (o *Offset) getTmpPath() string {
Expand All @@ -46,20 +43,12 @@ func (o *Offset) saveToTmp() error {
return err
}
defer file.Close()
if err := o.Callback.Save(file); err != nil {
return err
}

return nil
return o.Callback.Save(file)
}

func (o *Offset) Save() error {
if err := o.saveToTmp(); err != nil {
return err
}
if err := os.Rename(o.getTmpPath(), o.path); err != nil {
return err
}

return nil
return os.Rename(o.getTmpPath(), o.path)
}
5 changes: 1 addition & 4 deletions offset/simple_offset.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@ func (o *yamlValue) Load(r io.Reader) error {
if err != nil {
return err
}
if err := yaml.Unmarshal(b, o.value); err != nil {
return err
}
return nil
return yaml.Unmarshal(b, o.value)
}

func (o *yamlValue) Save(w io.Writer) error {
Expand Down
6 changes: 2 additions & 4 deletions pipeline/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,8 @@ func (b *Batcher) work(ctx context.Context) {

func (b *Batcher) commitBatch(ctx context.Context, events []*Event, batch *Batch) []*Event {
// we need to release batch first and then commit events
// so lets exchange local slice with batch slice to avoid data copying
tmp := events
events = batch.Events
batch.Events = tmp
// so lets swap local slice with batch slice to avoid data copying
events, batch.Events = batch.Events, events

batchSeq := batch.seq

Expand Down
1 change: 0 additions & 1 deletion pipeline/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ type processor struct {
actionWatcher *actionWatcher
recoverFromPanic func()

heartbeatCh chan *stream
metricsValues []string
}

Expand Down
4 changes: 1 addition & 3 deletions pipeline/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
// e.g. events from same file will be in same stream for "file" input plugin
// todo: remove dependency on streamer
type stream struct {
chargeIndex int
blockIndex int
len int
currentSeq uint64
Expand All @@ -23,7 +22,6 @@ type stream struct {

name StreamName
sourceID SourceID
sourceName string
streamer *streamer
blockTime time.Time

Expand Down Expand Up @@ -208,8 +206,8 @@ func (s *stream) get() *Event {
s.first = s.first.next
}

s.awaySeq = event.SeqID
if event != nil {
s.awaySeq = event.SeqID
event.stage = eventStageProcessor
s.len--
}
Expand Down
33 changes: 27 additions & 6 deletions pipeline/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,40 @@ import (
"unsafe"
)

// ByteToStringUnsafe converts byte slice to string without memory copy
// This creates mutable string, thus unsafe method, should be used with caution (never modify provided byte slice)
func ByteToStringUnsafe(b []byte) string {
return *(*string)(unsafe.Pointer(&b))
}

// StringToByteUnsafe converts string to byte slice without memory copy
// This creates mutable string, thus unsafe method, should be used with caution (never modify resulting byte slice)
func StringToByteUnsafe(s string) []byte {
strh := (*reflect.StringHeader)(unsafe.Pointer(&s))
var sh reflect.SliceHeader
sh.Data = strh.Data
sh.Len = strh.Len
sh.Cap = strh.Len
return *(*[]byte)(unsafe.Pointer(&sh))
var buf = *(*[]byte)(unsafe.Pointer(&s))
(*reflect.SliceHeader)(unsafe.Pointer(&buf)).Cap = len(s)
return buf
}

/* There are actually a lot of interesting ways to do this. Saving them here, for the purpose of possible debugging.
func StringToByteUnsafe(s string) []byte {
const max = 0x7fff0000
if len(s) > max {
panic("string too long")
}
return (*[max]byte)(unsafe.Pointer((*reflect.StringHeader)(unsafe.Pointer(&s)).Data))[:len(s):len(s)]
}
func StringToByteUnsafe(s string) (b []byte) {
bh := (*reflect.SliceHeader)(unsafe.Pointer(&b))
sh := (*reflect.StringHeader)(unsafe.Pointer(&s))
bh.Data = sh.Data
bh.Cap = sh.Len
bh.Len = sh.Len
return b
}
*/

const formats = "ansic|unixdate|rubydate|rfc822|rfc822z|rfc850|rfc1123|rfc1123z|rfc3339|rfc3339nano|kitchen|stamp|stampmilli|stampmicro|stampnano"

func ParseFormatName(formatName string) (string, error) {
Expand Down
35 changes: 14 additions & 21 deletions plugin/action/mask/mask.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (

"github.com/ozontech/file.d/fd"
"github.com/ozontech/file.d/pipeline"
"github.com/prometheus/client_golang/prometheus"
"github.com/ozontech/file.d/stats"
insaneJSON "github.com/vitkovskii/insane-json"
"go.uber.org/zap"
)
Expand All @@ -33,20 +33,17 @@ pipelines:
}*/

const (
substitution = byte('*')

metricName = "mask_plugin"
substitution = byte('*')
timesActivated = "times_activated"
)

var MaskPromCounter = prometheus.NewCounter(prometheus.CounterOpts{})

type Plugin struct {
config *Config
sourceBuf []byte
maskBuf []byte
logMaskAppeared bool
valueNodes []*insaneJSON.Node
logger *zap.SugaredLogger
logMaskAppeared bool
}

//! config-params
Expand Down Expand Up @@ -150,24 +147,19 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.ActionPluginP
p.config.Masks = compileMasks(p.config.Masks, p.logger)
if p.config.MetricSubsystemName != nil {
p.logMaskAppeared = true
p.registerPluginMetrics(pipeline.PromNamespace, *p.config.MetricSubsystemName, metricName)
p.registerPluginMetrics()
}
}

func (p *Plugin) Stop() {
func (p *Plugin) registerPluginMetrics() {
stats.RegisterCounter(&stats.MetricDesc{
Name: timesActivated,
Subsystem: *p.config.MetricSubsystemName,
Help: "Number of times mask plugin found the provided pattern",
})
}

func (p *Plugin) registerPluginMetrics(namespace, subsystem, metricName string) {
// can't declare counter as property on p.counter, because multiple cores
// will create multiple metrics and all but last one will be unregistered.
MaskPromCounter = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: metricName,
Help: "",
})
prometheus.DefaultRegisterer.Unregister(MaskPromCounter)
prometheus.DefaultRegisterer.MustRegister(MaskPromCounter)
func (p *Plugin) Stop() {
}

func appendMask(dst, src []byte, begin, end int) ([]byte, int) {
Expand Down Expand Up @@ -255,8 +247,9 @@ func (p *Plugin) Do(event *pipeline.Event) pipeline.ActionResult {
}
v.MutateToString(string(p.maskBuf))
}

if p.logMaskAppeared && maskApplied {
MaskPromCounter.Inc()
stats.GetCounter(*p.config.MetricSubsystemName, timesActivated).Inc()
p.logger.Infof("mask appeared to event, output string: %s", event.Root.EncodeToString())
}

Expand Down
2 changes: 1 addition & 1 deletion plugin/action/parse_re2/parse_re2.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (p *Plugin) Do(event *pipeline.Event) pipeline.ActionResult {
fields := p.re.SubexpNames()
var bl int
for i := 1; i < len(fields); i++ {
if len(fields[i]) == 0 {
if fields[i] == "" {
continue
}

Expand Down
6 changes: 0 additions & 6 deletions plugin/action/throttle/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,6 @@ func (l *limiter) isAllowed(event *pipeline.Event, ts time.Time) bool {
return l.buckets[index] <= l.limit.value
}

// bucketIDToTime converts bucketID to time. This time is start of the bucket.
func (l *limiter) bucketIDToTime(id int) time.Time {
nano := int64(id) * l.interval.Nanoseconds()
return time.Unix(nano/100000000, nano%100000000)
}

// timeToBucketID converts time to bucketID.
func (l *limiter) timeToBucketID(t time.Time) int {
return int(t.UnixNano() / l.interval.Nanoseconds())
Expand Down
18 changes: 18 additions & 0 deletions plugin/input/dmesg/dmesg.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,17 @@ import (
"github.com/ozontech/file.d/longpanic"
"github.com/ozontech/file.d/offset"
"github.com/ozontech/file.d/pipeline"
"github.com/ozontech/file.d/stats"
insaneJSON "github.com/vitkovskii/insane-json"
"go.uber.org/zap"
)

const (
subsystemName = "input_dmesg"

offsetErrors = "offset_errors"
)

/*{ introduction
It reads kernel events from /dev/kmsg
}*/
Expand Down Expand Up @@ -56,9 +63,11 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.InputPluginPa
p.logger = params.Logger
p.config = config.(*Config)
p.controller = params.Controller
p.registerPluginMetrics()

p.state = &state{}
if err := offset.LoadYAML(p.config.OffsetsFile, p.state); err != nil {
stats.GetCounter(subsystemName, offsetErrors).Inc()
p.logger.Error("can't load offset file: %s", err.Error())
}

Expand All @@ -72,6 +81,14 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.InputPluginPa
longpanic.Go(p.read)
}

func (p *Plugin) registerPluginMetrics() {
stats.RegisterCounter(&stats.MetricDesc{
Name: offsetErrors,
Subsystem: subsystemName,
Help: "Number of errors occurred when saving/loading offset",
})
}

func (p *Plugin) read() {
root := insaneJSON.Spawn()
defer insaneJSON.Release(root)
Expand Down Expand Up @@ -115,6 +132,7 @@ func (p *Plugin) Commit(event *pipeline.Event) {
p.state.TS = event.Offset

if err := offset.SaveYAML(p.config.OffsetsFile, p.state); err != nil {
stats.GetCounter(subsystemName, offsetErrors).Inc()
p.logger.Error("can't save offset file: %s", err.Error())
}
}
Loading

0 comments on commit 83a0cf1

Please sign in to comment.