Skip to content

Commit

Permalink
meta info in pipeline (#600)
Browse files Browse the repository at this point in the history
* auth with client cert in kafka plugin (#607)

* auth with client cert in kafka plugin

* because you can set content of cert or key

* e2e test for kafka auth by client cert

* dont skip event on error parse meta

* when template for meta is simple

* fix metadata fillment after review

* rename metadata registry to templater

* add meta in configs for e2e test

* dont reuse buffer in templater for avoid race

* convertToResultMaps returns strings

* Revert "dont reuse buffer in templater for avoid race"

This reverts commit 60db00c.

* poolBuffer in meta templater

* use sync.pool in template if we have templates

* fix flaky TestServeChunks
  • Loading branch information
DmitryRomanov authored May 23, 2024
1 parent a9fd975 commit e4ace70
Show file tree
Hide file tree
Showing 22 changed files with 432 additions and 42 deletions.
3 changes: 3 additions & 0 deletions cfg/meta_templates.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package cfg

type MetaTemplates map[string]string
3 changes: 3 additions & 0 deletions e2e/http_file/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ pipelines:
http_file:
input:
type: http
meta:
remote_addr: "{{ .remote_addr }}"
user_agent: '{{ index (index .request.Header "User-Agent") 0}}'
actions:
- type: discard
match_fields:
Expand Down
3 changes: 3 additions & 0 deletions e2e/kafka_file/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,8 @@ pipelines:
input:
type: kafka
offset: oldest
meta:
partition: 'partition_{{ .partition }}'
topic: '{{ .topic }}'
output:
type: file
76 changes: 76 additions & 0 deletions pipeline/metadata/templater.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package metadata

import (
"bytes"
"fmt"
"regexp"
"sync"
"text/template"

"github.com/ozontech/file.d/cfg"
)

type MetaData map[string]string

type MetaTemplater struct {
templates map[string]*template.Template
singleValues map[string]string
poolBuffer sync.Pool
}

func NewMetaTemplater(templates cfg.MetaTemplates) *MetaTemplater {
compiledTemplates := make(map[string]*template.Template)
singleValues := make(map[string]string)
singleValueRegex := regexp.MustCompile(`^\{\{\ +\.(\w+)\ +\}\}$`)

for k, v := range templates {
vals := singleValueRegex.FindStringSubmatch(v)
if len(vals) > 1 {
singleValues[k] = vals[1]
} else {
compiledTemplates[k] = template.Must(template.New("").Parse(v))
}
}

meta := MetaTemplater{
templates: compiledTemplates,
singleValues: singleValues,
poolBuffer: sync.Pool{
New: func() interface{} { return new(bytes.Buffer) },
},
}

return &meta
}

type Data interface {
GetData() map[string]any
}

func (m *MetaTemplater) Render(data Data) (MetaData, error) {
values := data.GetData()
meta := MetaData{}

if len(m.templates) > 0 {
tplOutput := m.poolBuffer.Get().(*bytes.Buffer)
defer m.poolBuffer.Put(tplOutput)

for k, tmpl := range m.templates {
tplOutput.Reset()
err := tmpl.Execute(tplOutput, values)
if err != nil {
return meta, err
} else {
meta[k] = tplOutput.String()
}
}
}

for k, tmpl := range m.singleValues {
if val, ok := values[tmpl]; ok {
meta[k] = fmt.Sprintf("%v", val)
}
}

return meta, nil
}
43 changes: 43 additions & 0 deletions pipeline/metadata/templater_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package metadata

import (
"fmt"
"testing"

"github.com/ozontech/file.d/cfg"
"github.com/stretchr/testify/assert"
)

func TestTemplaterRender(t *testing.T) {
templater := NewMetaTemplater(
cfg.MetaTemplates{
"partition": "partition_{{ .partition }}",
"partition_describe": "{{ .partition }} partition",
"topic": "{{ .topic }}",
"broker": "{{ .broker }}",
},
)

data := testMetadata{}
result, err := templater.Render(data)
assert.Nil(t, err)
assert.Equal(
t,
fmt.Sprint(map[string]any{
"topic": "topic",
"partition": "partition_1",
"partition_describe": "1 partition",
}),
fmt.Sprint(result),
)
}

type testMetadata struct{}

func (f testMetadata) GetData() map[string]any {
return map[string]any{
"topic": "topic",
"partition": 1,
"offset": 1000,
}
}
22 changes: 20 additions & 2 deletions pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/ozontech/file.d/logger"
"github.com/ozontech/file.d/metric"
"github.com/ozontech/file.d/pipeline/antispam"
"github.com/ozontech/file.d/pipeline/metadata"
"github.com/prometheus/client_golang/prometheus"
insaneJSON "github.com/vitkovskii/insane-json"
"go.uber.org/atomic"
Expand Down Expand Up @@ -43,7 +44,7 @@ const (
type finalizeFn = func(event *Event, notifyInput bool, backEvent bool)

type InputPluginController interface {
In(sourceID SourceID, sourceName string, offset int64, data []byte, isNewSource bool) uint64
In(sourceID SourceID, sourceName string, offset int64, data []byte, isNewSource bool, meta metadata.MetaData) uint64
UseSpread() // don't use stream field and spread all events across all processors
DisableStreams() // don't use stream field
SuggestDecoder(t decoder.DecoderType) // set decoder if pipeline uses "auto" value for decoder
Expand Down Expand Up @@ -340,7 +341,7 @@ func (p *Pipeline) GetOutput() OutputPlugin {
}

// In decodes message and passes it to event stream.
func (p *Pipeline) In(sourceID SourceID, sourceName string, offset int64, bytes []byte, isNewSource bool) (seqID uint64) {
func (p *Pipeline) In(sourceID SourceID, sourceName string, offset int64, bytes []byte, isNewSource bool, meta metadata.MetaData) (seqID uint64) {
length := len(bytes)

// don't process mud.
Expand Down Expand Up @@ -463,6 +464,23 @@ func (p *Pipeline) In(sourceID SourceID, sourceName string, offset int64, bytes
p.logger.Panic("unknown decoder", zap.Int("decoder", int(dec)))
}

if len(meta) > 0 {
if event.Root.IsArray() {
nodeArray := event.Root.AsArray()
for _, elem := range nodeArray {
if elem.IsObject() {
for k, v := range meta {
elem.AddField(k).MutateToString(v)
}
}
}
} else {
for k, v := range meta {
CreateNestedField(event.Root, []string{k}).MutateToString(v)
}
}
}

event.Offset = offset
event.SourceID = sourceID
event.SourceName = sourceName
Expand Down
4 changes: 2 additions & 2 deletions pipeline/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestInUnparsableMessages(t *testing.T) {

pipe.SetInput(getFakeInputInfo())

seqID := pipe.In(sourceID, "kafka", offset, message, false)
seqID := pipe.In(sourceID, "kafka", offset, message, false, nil)
require.Equal(t, pipeline.EventSeqIDError, seqID)

refPipe := reflect.ValueOf(pipe)
Expand Down Expand Up @@ -119,7 +119,7 @@ func TestInInvalidMessages(t *testing.T) {

pipe.SetInput(getFakeInputInfo())

seqID := pipe.In(tCase.sourceID, "kafka", tCase.offset, tCase.message, false)
seqID := pipe.In(tCase.sourceID, "kafka", tCase.offset, tCase.message, false, nil)
require.Equal(t, pipeline.EventSeqIDError, seqID)
})
}
Expand Down
4 changes: 4 additions & 0 deletions plugin/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,10 @@ pipelines:
brokers: [kafka:9092, kafka:9091]
topics: [topic1, topic2]
offset: newest
meta:
partition: '{{ .partition }}'
topic: '{{ .topic }}'
offset: '{{ .offset }}'
# output plugin is not important in this case, let's emulate s3 output.
output:
type: s3
Expand Down
4 changes: 4 additions & 0 deletions plugin/input/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ pipelines:
brokers: [kafka:9092, kafka:9091]
topics: [topic1, topic2]
offset: newest
meta:
partition: '{{ .partition }}'
topic: '{{ .topic }}'
offset: '{{ .offset }}'
# output plugin is not important in this case, let's emulate s3 output.
output:
type: s3
Expand Down
2 changes: 1 addition & 1 deletion plugin/input/dmesg/dmesg.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (p *Plugin) read() {

out = root.Encode(out[:0])

p.controller.In(0, "dmesg", ts, out, false)
p.controller.In(0, "dmesg", ts, out, false, nil)
}
}

Expand Down
2 changes: 1 addition & 1 deletion plugin/input/fake/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (p *Plugin) In(sourceID pipeline.SourceID, sourceName string, offset int64,
if p.inFn != nil {
p.inFn()
}
_ = p.controller.In(sourceID, sourceName, offset, bytes, false)
_ = p.controller.In(sourceID, sourceName, offset, bytes, false, nil)
}

// > It sets up a hook to make sure the test event has been successfully committed.
Expand Down
5 changes: 3 additions & 2 deletions plugin/input/file/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os"

"github.com/ozontech/file.d/pipeline"
"github.com/ozontech/file.d/pipeline/metadata"
"go.uber.org/zap"
)

Expand All @@ -15,7 +16,7 @@ type worker struct {

type inputer interface {
// In processes event and returns it seq number.
In(sourceID pipeline.SourceID, sourceName string, offset int64, data []byte, isNewSource bool) uint64
In(sourceID pipeline.SourceID, sourceName string, offset int64, data []byte, isNewSource bool, meta metadata.MetaData) uint64
IncReadOps()
IncMaxEventSizeExceeded()
}
Expand Down Expand Up @@ -109,7 +110,7 @@ func (w *worker) work(controller inputer, jobProvider *jobProvider, readBufferSi
inBuf = accumBuf
}

job.lastEventSeq = controller.In(sourceID, sourceName, lastOffset+scanned, inBuf, isVirgin)
job.lastEventSeq = controller.In(sourceID, sourceName, lastOffset+scanned, inBuf, isVirgin, nil)
}
// restore the line buffer
accumBuf = accumBuf[:0]
Expand Down
3 changes: 2 additions & 1 deletion plugin/input/file/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/ozontech/file.d/metric"
"github.com/ozontech/file.d/pipeline"
"github.com/ozontech/file.d/pipeline/metadata"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -26,7 +27,7 @@ func (i *inputerMock) IncReadOps() {}

func (i *inputerMock) IncMaxEventSizeExceeded() {}

func (i *inputerMock) In(_ pipeline.SourceID, _ string, _ int64, data []byte, _ bool) uint64 {
func (i *inputerMock) In(_ pipeline.SourceID, _ string, _ int64, data []byte, _ bool, _ metadata.MetaData) uint64 {
i.gotData = append(i.gotData, string(data))
return 0
}
Expand Down
7 changes: 7 additions & 0 deletions plugin/input/http/README.idoc.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,10 @@

### Config params
@config-params|description

### Meta params
**`login`**

**`remote_addr`** *`net.IP`*

**`request`** *`http.Request`*
18 changes: 18 additions & 0 deletions plugin/input/http/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,17 @@ You can use 'warn' log level for logging authorizations.

<br>

**`meta`** *`cfg.MetaTemplates`*

Meta params

Add meta information to an event (look at Meta params)
Use [go-template](https://pkg.go.dev/text/template) syntax

Example: ```user_agent: '{{ index (index .request.Header "User-Agent") 0}}'```

<br>

**`strategy`** *`string`* *`default=disabled`* *`options=disabled|basic|bearer`*

AuthStrategy.Strategy describes strategy to use.
Expand All @@ -108,4 +119,11 @@ Key uses in the http_input_total metric.
<br>


### Meta params
**`login`**

**`remote_addr`** *`net.IP`*

**`request`** *`http.Request`*

<br>*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)*
Loading

0 comments on commit e4ace70

Please sign in to comment.