Skip to content

Commit

Permalink
merge master
Browse files Browse the repository at this point in the history
  • Loading branch information
ansakharov committed Aug 18, 2022
2 parents c2ad065 + 2f1b918 commit 7b68338
Showing 54 changed files with 1,816 additions and 1,044 deletions.
34 changes: 24 additions & 10 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
@@ -12,19 +12,33 @@ jobs:
goreleaser:
runs-on: ubuntu-latest
steps:
-
name: Checkout
uses: actions/checkout@v2
- name: Checkout code
uses: actions/checkout@v3
with:
fetch-depth: 0
-
name: Set up Go
uses: actions/setup-go@v2

- name: Install Go
uses: actions/setup-go@v3
with:
go-version: 1.18

- name: Get Go environment
id: go-env
run: |
echo "::set-output name=cache::$(go env GOCACHE)"
echo "::set-output name=modcache::$(go env GOMODCACHE)"
- name: Set up cache
uses: actions/cache@v3
with:
go-version: 1.17
-
name: Run GoReleaser
uses: goreleaser/goreleaser-action@v2
path: |
${{ steps.go-env.outputs.cache }}
${{ steps.go-env.outputs.modcache }}
key: release-${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: |
release-${{ runner.os }}-go-
- name: Run GoReleaser
uses: goreleaser/goreleaser-action@v3
with:
distribution: goreleaser
version: latest
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -48,7 +48,7 @@ bench-file:

.PHONY: gen-doc
gen-doc:
@go install github.com/vitkovskii/insane-doc@latest
@go install github.com/vitkovskii/insane-doc@v0.0.2
@~/go/bin/insane-doc

.PHONY: profile-file
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -41,7 +41,7 @@ TBD: throughput on production servers.

**Input**: [dmesg](plugin/input/dmesg/README.md), [fake](plugin/input/fake/README.md), [file](plugin/input/file/README.md), [http](plugin/input/http/README.md), [journalctl](plugin/input/journalctl/README.md), [k8s](plugin/input/k8s/README.md), [kafka](plugin/input/kafka/README.md)

**Action**: [add_host](plugin/action/add_host/README.md), [convert_date](plugin/action/convert_date/README.md), [convert_log_level](plugin/action/convert_log_level/README.md), [debug](plugin/action/debug/README.md), [discard](plugin/action/discard/README.md), [flatten](plugin/action/flatten/README.md), [join](plugin/action/join/README.md), [json_decode](plugin/action/json_decode/README.md), [json_encode](plugin/action/json_encode/README.md), [keep_fields](plugin/action/keep_fields/README.md), [mask](plugin/action/mask/README.md), [modify](plugin/action/modify/README.md), [parse_es](plugin/action/parse_es/README.md), [parse_re2](plugin/action/parse_re2/README.md), [remove_fields](plugin/action/remove_fields/README.md), [rename](plugin/action/rename/README.md), [set_time](plugin/action/set_time/README.md), [throttle](plugin/action/throttle/README.md)
**Action**: [add_host](plugin/action/add_host/README.md), [convert_date](plugin/action/convert_date/README.md), [convert_log_level](plugin/action/convert_log_level/README.md), [debug](plugin/action/debug/README.md), [discard](plugin/action/discard/README.md), [flatten](plugin/action/flatten/README.md), [join](plugin/action/join/README.md), [join_template](plugin/action/join_template/README.md), [json_decode](plugin/action/json_decode/README.md), [json_encode](plugin/action/json_encode/README.md), [keep_fields](plugin/action/keep_fields/README.md), [mask](plugin/action/mask/README.md), [modify](plugin/action/modify/README.md), [parse_es](plugin/action/parse_es/README.md), [parse_re2](plugin/action/parse_re2/README.md), [remove_fields](plugin/action/remove_fields/README.md), [rename](plugin/action/rename/README.md), [set_time](plugin/action/set_time/README.md), [throttle](plugin/action/throttle/README.md)

**Output**: [devnull](plugin/output/devnull/README.md), [elasticsearch](plugin/output/elasticsearch/README.md), [gelf](plugin/output/gelf/README.md), [kafka](plugin/output/kafka/README.md), [postgres](plugin/output/postgres/README.md), [s3](plugin/output/s3/README.md), [splunk](plugin/output/splunk/README.md), [stdout](plugin/output/stdout/README.md)

1 change: 1 addition & 0 deletions _sidebar.md
Original file line number Diff line number Diff line change
@@ -29,6 +29,7 @@
- [discard](plugin/action/discard/README.md)
- [flatten](plugin/action/flatten/README.md)
- [join](plugin/action/join/README.md)
- [join_template](plugin/action/join_template/README.md)
- [json_decode](plugin/action/json_decode/README.md)
- [json_encode](plugin/action/json_encode/README.md)
- [keep_fields](plugin/action/keep_fields/README.md)
14 changes: 8 additions & 6 deletions cfg/config.go
Original file line number Diff line number Diff line change
@@ -259,12 +259,14 @@ func Parse(ptr interface{}, values map[string]int) error {
// it isn't just a recursion
// it also captures values with the same name from parent
// i.e. take this config:
// {
// "T": 10,
// "Child": { // has `child:true` in a tag
// "T": null
// }
// }
//
// {
// "T": 10,
// "Child": { // has `child:true` in a tag
// "T": null
// }
// }
//
// this function will set `config.Child.T = config.T`
// see file.d/cfg/config_test.go:TestHierarchy for an example
func ParseChild(parent reflect.Value, v reflect.Value, values map[string]int) error {
1 change: 1 addition & 0 deletions cmd/file.d.go
Original file line number Diff line number Diff line change
@@ -22,6 +22,7 @@ import (
_ "github.com/ozontech/file.d/plugin/action/discard"
_ "github.com/ozontech/file.d/plugin/action/flatten"
_ "github.com/ozontech/file.d/plugin/action/join"
_ "github.com/ozontech/file.d/plugin/action/join_template"
_ "github.com/ozontech/file.d/plugin/action/json_decode"
_ "github.com/ozontech/file.d/plugin/action/json_encode"
_ "github.com/ozontech/file.d/plugin/action/keep_fields"
2 changes: 2 additions & 0 deletions cmd/file.d_test.go
Original file line number Diff line number Diff line change
@@ -155,8 +155,10 @@ func runWriter(tempDir string, files int) {

/*
Plugins registered automatically after importing by init() function:
_ "github.com/ozontech/file.d/plugin/output/devnull"
_ "github.com/ozontech/file.d/plugin/output/elasticsearch"
Moving plugin in sub dir in plugin dir will quit registration quietly.
To prevent this let's check that DefaultPluginRegistry contains all plugins.
Plugins "dmesg", "journalctl" linux based, they contain tag: //go:build linux.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/ozontech/file.d

go 1.17
go 1.18

require (
github.com/Masterminds/squirrel v1.5.2
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -226,7 +226,6 @@ github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpO
github.com/imdario/mergo v0.3.7 h1:Y+UAYTZ7gDEuOfhxKWy+dvb5dRQ6rJjFSdX2HZY1/gI=
github.com/imdario/mergo v0.3.7/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/jackc/chunkreader v1.0.0 h1:4s39bBR8ByfqH+DKm8rQA3E1LHZWB9XWcrz8fqaZbe0=
github.com/jackc/chunkreader v1.0.0/go.mod h1:RT6O25fNZIuasFJRyZ4R/Y2BbhasbmZXF9QQ7T3kePo=
github.com/jackc/chunkreader/v2 v2.0.0/go.mod h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk=
github.com/jackc/chunkreader/v2 v2.0.1 h1:i+RDz65UE+mmpjTfyz0MoVTnzeYxroil2G82ki7MGG8=
@@ -247,7 +246,6 @@ github.com/jackc/pgmock v0.0.0-20210724152146-4ad1a8207f65 h1:DadwsjnMwFjfWc9y5W
github.com/jackc/pgmock v0.0.0-20210724152146-4ad1a8207f65/go.mod h1:5R2h2EEX+qri8jOWMbJCtaPWkrrNc7OHwsp2TCqp7ak=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgproto3 v1.1.0 h1:FYYE4yRw+AgI8wXIinMlNjBbp/UitDJwfj5LqqewP1A=
github.com/jackc/pgproto3 v1.1.0/go.mod h1:eR5FA3leWg7p9aeAqi37XOTgTIbkABlvcPB3E5rlc78=
github.com/jackc/pgproto3/v2 v2.0.0-alpha1.0.20190420180111-c116219b62db/go.mod h1:bhq50y+xrl9n5mRYyCBFKkpRVTLYJVWeCc+mEAI3yXA=
github.com/jackc/pgproto3/v2 v2.0.0-alpha1.0.20190609003834-432c2951c711/go.mod h1:uH0AWtUmuShn0bcesswc4aBTWGvw0cAxIJp+6OB//Wg=
29 changes: 14 additions & 15 deletions pipeline/event.go
Original file line number Diff line number Diff line change
@@ -195,25 +195,24 @@ func (e *Event) String() string {
type eventPool struct {
capacity int

avgEventSize int
freeEventsCount int
getCounter atomic.Int64
backCounter atomic.Int64
events []*Event
free1 []atomic.Bool
free2 []atomic.Bool
avgEventSize int
inUseEvents atomic.Int64
getCounter atomic.Int64
backCounter atomic.Int64
events []*Event
free1 []atomic.Bool
free2 []atomic.Bool

getMu *sync.Mutex
getCond *sync.Cond
}

func newEventPool(capacity, avgEventSize int) *eventPool {
eventPool := &eventPool{
avgEventSize: avgEventSize,
capacity: capacity,
freeEventsCount: capacity,
getMu: &sync.Mutex{},
backCounter: *atomic.NewInt64(int64(capacity)),
avgEventSize: avgEventSize,
capacity: capacity,
getMu: &sync.Mutex{},
backCounter: *atomic.NewInt64(int64(capacity)),
}

eventPool.getCond = sync.NewCond(eventPool.getMu)
@@ -260,7 +259,7 @@ func (p *eventPool) get() *Event {
event := p.events[x]
p.events[x] = nil
p.free2[x].Store(false)

p.inUseEvents.Inc()
event.reset(p.avgEventSize)
return event
}
@@ -290,16 +289,16 @@ func (p *eventPool) back(event *Event) {
tries = 0
}
}

p.events[x] = event
p.free1[x].Store(true)
p.inUseEvents.Dec()
p.getCond.Broadcast()
}

func (p *eventPool) dump() string {
out := logger.Cond(len(p.events) == 0, logger.Header("no events"), func() string {
o := logger.Header("events")
for i := 0; i < p.freeEventsCount; i++ {
for i := 0; i < p.capacity; i++ {
event := p.events[i]
eventStr := event.String()
if eventStr == "" {
40 changes: 32 additions & 8 deletions pipeline/pipeline.go
Original file line number Diff line number Diff line change
@@ -12,13 +12,14 @@ import (
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic"
"go.uber.org/zap"

"github.com/ozontech/file.d/decoder"
"github.com/ozontech/file.d/logger"
"github.com/ozontech/file.d/longpanic"
"github.com/ozontech/file.d/metric"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic"
"go.uber.org/zap"
)

const (
@@ -43,6 +44,8 @@ const (
outputEventsSizeMetric = "output_events_size"
readOpsEventsSizeMetric = "read_ops_count"
maxEventSizeExceeded = "max_event_size_exceeded"
eventPoolCapacity = "event_pool_capacity"
inUseEventsMetric = "event_pool_in_use_events"

wrongEventCRIFormatMetric = "wrong_event_cri_format"
)
@@ -153,6 +156,7 @@ func New(name string, settings *Settings, registry *prometheus.Registry) *Pipeli
}

pipeline.registerMetrics()
pipeline.setDefaultMetrics()

switch settings.Decoder {
case "json":
@@ -185,6 +189,16 @@ func (p *Pipeline) subsystemName() string {
}

func (p *Pipeline) registerMetrics() {
metric.RegisterGauge(&metric.MetricDesc{
Subsystem: p.subsystemName(),
Name: inUseEventsMetric,
Help: "Count of pool events which is used for processing",
})
metric.RegisterGauge(&metric.MetricDesc{
Subsystem: p.subsystemName(),
Name: eventPoolCapacity,
Help: "Pool capacity value",
})
metric.RegisterCounter(&metric.MetricDesc{
Subsystem: p.subsystemName(),
Name: inputEventsCountMetric,
@@ -222,6 +236,10 @@ func (p *Pipeline) registerMetrics() {
})
}

func (p *Pipeline) setDefaultMetrics() {
metric.GetGauge(p.subsystemName(), eventPoolCapacity).Set(float64(p.settings.Capacity))
}

// SetupHTTPHandlers creates handlers for plugin endpoints and pipeline info.
// Plugin endpoints can be accessed via
// URL `/pipelines/<pipeline_name>/<plugin_index_in_config>/<plugin_endpoint>`.
@@ -350,6 +368,7 @@ func (p *Pipeline) In(sourceID SourceID, sourceName string, offset int64, bytes
p.inputSize.Add(int64(length))

event := p.eventPool.get()

var dec decoder.DecoderType
if p.decoder == decoder.AUTO {
dec = p.suggestedDecoder
@@ -570,20 +589,20 @@ type deltas struct {
func (p *Pipeline) logChanges(myDeltas *deltas) {
inputSize := p.inputSize.Load()
inputEvents := p.inputEvents.Load()
inUseEvents := p.eventPool.inUseEvents.Load()

interval := p.settings.MaintenanceInterval
rate := int(myDeltas.deltaInputEvents * float64(time.Second) / float64(interval))
rateMb := myDeltas.deltaInputSize * float64(time.Second) / float64(interval) / 1024 / 1024
readOps := int(myDeltas.deltaReads * float64(time.Second) / float64(interval))
tc := int64(math.Max(float64(inputSize), 1))

p.logger.Infof(`%q pipeline stats interval=%ds, active procs=%d/%d, queue=%d/%d, out=%d|%.1fMb,`+
`rate=%d/s|%.1fMb/s, read ops=%d/s, total=%d|%.1fMb, avg size=%d, max size=%d, pool fullness=%d/%d`,
p.logger.Infof(`%q pipeline stats interval=%ds, active procs=%d/%d, events outside pool=%d/%d, events in pool=%d/%d, out=%d|%.1fMb,`+
`rate=%d/s|%.1fMb/s, read ops=%d/s, total=%d|%.1fMb, avg size=%d, max size=%d`,
p.Name, interval/time.Second, p.activeProcs.Load(), p.procCount.Load(),
p.settings.Capacity-p.eventPool.freeEventsCount, p.settings.Capacity,
inUseEvents, p.settings.Capacity, p.settings.Capacity-int(inUseEvents), p.settings.Capacity,
int64(myDeltas.deltaInputEvents), float64(myDeltas.deltaInputSize)/1024.0/1024.0, rate, rateMb, readOps,
inputEvents, float64(inputSize)/1024.0/1024.0, inputSize/tc, p.maxSize,
p.eventPool.capacity-p.eventPool.freeEventsCount, p.eventPool.capacity)
inputEvents, float64(inputSize)/1024.0/1024.0, inputSize/tc, p.maxSize)
}

func (p *Pipeline) incMetrics(inputEvents, inputSize, outputEvents, outputSize, reads *DeltaWrapper) *deltas {
@@ -610,6 +629,10 @@ func (p *Pipeline) incMetrics(inputEvents, inputSize, outputEvents, outputSize,
return myDeltas
}

func (p *Pipeline) setMetrics(inUseEvents atomic.Int64) {
metric.GetGauge(p.subsystemName(), inUseEventsMetric).Set(float64(inUseEvents.Load()))
}

func (p *Pipeline) maintenance() {
inputEvents := newDeltaWrapper()
inputSize := newDeltaWrapper()
@@ -627,6 +650,7 @@ func (p *Pipeline) maintenance() {
p.metricsHolder.maintenance()

myDeltas := p.incMetrics(inputEvents, inputSize, outputEvents, outputSize, readOps)
p.setMetrics(p.eventPool.inUseEvents)
p.logChanges(myDeltas)

if len(p.inSample) > 0 {
21 changes: 21 additions & 0 deletions plugin/README.md
Original file line number Diff line number Diff line change
@@ -215,6 +215,27 @@ pipelines:
```

[More details...](plugin/action/join/README.md)
## join_template
Alias to "join" plugin with predefined `start` and `continue` parameters.

> ⚠ Parsing the whole event flow could be very CPU intensive because the plugin uses regular expressions.
> Consider `match_fields` parameter to process only particular events. Check out an example for details.

**Example of joining Go panics**:
```yaml
pipelines:
example_pipeline:
...
actions:
- type: join_template
template: go_panic
field: log
match_fields:
stream: stderr // apply only for events which was written to stderr to save CPU time
...
```

[More details...](plugin/action/join_template/README.md)
## json_decode
It decodes a JSON string from the event field and merges the result with the event root.
If the decoded JSON isn't an object, the event will be skipped.
21 changes: 21 additions & 0 deletions plugin/action/README.md
Original file line number Diff line number Diff line change
@@ -73,6 +73,27 @@ pipelines:
```

[More details...](plugin/action/join/README.md)
## join_template
Alias to "join" plugin with predefined `start` and `continue` parameters.

> ⚠ Parsing the whole event flow could be very CPU intensive because the plugin uses regular expressions.
> Consider `match_fields` parameter to process only particular events. Check out an example for details.
**Example of joining Go panics**:
```yaml
pipelines:
example_pipeline:
...
actions:
- type: join_template
template: go_panic
field: log
match_fields:
stream: stderr // apply only for events which was written to stderr to save CPU time
...
```

[More details...](plugin/action/join_template/README.md)
## json_decode
It decodes a JSON string from the event field and merges the result with the event root.
If the decoded JSON isn't an object, the event will be skipped.
13 changes: 7 additions & 6 deletions plugin/action/add_host/add_host.go
Original file line number Diff line number Diff line change
@@ -10,17 +10,18 @@ import (
/*{ introduction
It adds field containing hostname to an event.
}*/

type Plugin struct {
config *Config
}

//! config-params
//^ config-params
// ! config-params
// ^ config-params
type Config struct {
//> @3@4@5@6
//>
//> The event field to which put the hostname. Must be a string.
Field string `json:"field" default:"host" required:"true"` //*
// > @3@4@5@6
// >
// > The event field to which put the hostname. Must be a string.
Field string `json:"field" default:"host" required:"true"` // *
}

func init() {
Loading

0 comments on commit 7b68338

Please sign in to comment.