Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
meta info in pipeline
Browse files Browse the repository at this point in the history
DmitryRomanov committed Mar 14, 2024
1 parent 766777f commit db9d0cf
Showing 13 changed files with 131 additions and 36 deletions.
42 changes: 42 additions & 0 deletions pipeline/metadata_registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package pipeline

import (
"bytes"
"html/template"

"github.com/ozontech/file.d/cfg"
)

type MetaData map[string]string

type MetaRegistry struct {
templates cfg.MetaTemplates

Check failure on line 13 in pipeline/metadata_registry.go

GitHub Actions / lint

undefined: cfg.MetaTemplates

Check failure on line 13 in pipeline/metadata_registry.go

GitHub Actions / lint

undefined: cfg.MetaTemplates

Check failure on line 13 in pipeline/metadata_registry.go

GitHub Actions / lint

undefined: cfg.MetaTemplates

Check failure on line 13 in pipeline/metadata_registry.go

GitHub Actions / e2e_test

undefined: cfg.MetaTemplates

Check failure on line 13 in pipeline/metadata_registry.go

GitHub Actions / e2e_test (-race)

undefined: cfg.MetaTemplates

Check failure on line 13 in pipeline/metadata_registry.go

GitHub Actions / test

undefined: cfg.MetaTemplates

Check failure on line 13 in pipeline/metadata_registry.go

GitHub Actions / test (-race)

undefined: cfg.MetaTemplates
values map[string]interface{}
}

func NewMetaRegistry(templates cfg.MetaTemplates) *MetaRegistry {

Check failure on line 17 in pipeline/metadata_registry.go

GitHub Actions / lint

undefined: cfg.MetaTemplates) (typecheck)

Check failure on line 17 in pipeline/metadata_registry.go

GitHub Actions / lint

undefined: cfg.MetaTemplates) (typecheck)

Check failure on line 17 in pipeline/metadata_registry.go

GitHub Actions / lint

undefined: cfg.MetaTemplates) (typecheck)

Check failure on line 17 in pipeline/metadata_registry.go

GitHub Actions / e2e_test

undefined: cfg.MetaTemplates

Check failure on line 17 in pipeline/metadata_registry.go

GitHub Actions / e2e_test (-race)

undefined: cfg.MetaTemplates

Check failure on line 17 in pipeline/metadata_registry.go

GitHub Actions / test

undefined: cfg.MetaTemplates

Check failure on line 17 in pipeline/metadata_registry.go

GitHub Actions / test (-race)

undefined: cfg.MetaTemplates
meta := MetaRegistry{
templates: templates,
values: make(map[string]interface{}),
}
return &meta
}

func (m *MetaRegistry) Set(k string, v interface{}) {
m.values[k] = v
}

func (m *MetaRegistry) GetMeta() (MetaData, error) {
meta := MetaData{}
for k, v := range m.templates {
tmpl := template.Must(template.New("").Parse(v))
var tplOutput bytes.Buffer
err := tmpl.Execute(&tplOutput, m.values)
if err != nil {
return meta, err
} else {
meta[k] = tplOutput.String()
}
}
return meta, nil
}
21 changes: 19 additions & 2 deletions pipeline/pipeline.go
Original file line number Diff line number Diff line change
@@ -43,7 +43,7 @@ const (
type finalizeFn = func(event *Event, notifyInput bool, backEvent bool)

type InputPluginController interface {
In(sourceID SourceID, sourceName string, offset int64, data []byte, isNewSource bool) uint64
In(sourceID SourceID, sourceName string, offset int64, data []byte, isNewSource bool, meta MetaData) uint64
UseSpread() // don't use stream field and spread all events across all processors
DisableStreams() // don't use stream field
SuggestDecoder(t decoder.DecoderType) // set decoder if pipeline uses "auto" value for decoder
@@ -340,7 +340,7 @@ func (p *Pipeline) GetOutput() OutputPlugin {
}

// In decodes message and passes it to event stream.
func (p *Pipeline) In(sourceID SourceID, sourceName string, offset int64, bytes []byte, isNewSource bool) (seqID uint64) {
func (p *Pipeline) In(sourceID SourceID, sourceName string, offset int64, bytes []byte, isNewSource bool, meta MetaData) (seqID uint64) {
length := len(bytes)

// don't process mud.
@@ -463,6 +463,23 @@ func (p *Pipeline) In(sourceID SourceID, sourceName string, offset int64, bytes
p.logger.Panic("unknown decoder", zap.Int("decoder", int(dec)))
}

if len(meta) > 0 {
if event.Root.IsArray() {
nodeArray := event.Root.AsArray()
for _, elem := range nodeArray {
if elem.IsObject() {
for k, v := range meta {
elem.AddField(k).MutateToString(v)
}
}
}
} else {
for k, v := range meta {
CreateNestedField(event.Root, []string{k}).MutateToString(v)
}
}
}

event.Offset = offset
event.SourceID = sourceID
event.SourceName = sourceName
4 changes: 2 additions & 2 deletions pipeline/pipeline_test.go
Original file line number Diff line number Diff line change
@@ -41,7 +41,7 @@ func TestInUnparsableMessages(t *testing.T) {

pipe.SetInput(getFakeInputInfo())

seqID := pipe.In(sourceID, "kafka", offset, message, false)
seqID := pipe.In(sourceID, "kafka", offset, message, false, nil)
require.Equal(t, pipeline.EventSeqIDError, seqID)

refPipe := reflect.ValueOf(pipe)
@@ -119,7 +119,7 @@ func TestInInvalidMessages(t *testing.T) {

pipe.SetInput(getFakeInputInfo())

seqID := pipe.In(tCase.sourceID, "kafka", tCase.offset, tCase.message, false)
seqID := pipe.In(tCase.sourceID, "kafka", tCase.offset, tCase.message, false, nil)
require.Equal(t, pipeline.EventSeqIDError, seqID)
})
}
2 changes: 1 addition & 1 deletion plugin/input/dmesg/dmesg.go
Original file line number Diff line number Diff line change
@@ -111,7 +111,7 @@ func (p *Plugin) read() {

out = root.Encode(out[:0])

p.controller.In(0, "dmesg", ts, out, false)
p.controller.In(0, "dmesg", ts, out, false, nil)
}
}

2 changes: 1 addition & 1 deletion plugin/input/fake/fake.go
Original file line number Diff line number Diff line change
@@ -49,7 +49,7 @@ func (p *Plugin) In(sourceID pipeline.SourceID, sourceName string, offset int64,
if p.inFn != nil {
p.inFn()
}
_ = p.controller.In(sourceID, sourceName, offset, bytes, false)
_ = p.controller.In(sourceID, sourceName, offset, bytes, false, nil)
}

// > It sets up a hook to make sure the test event has been successfully committed.
4 changes: 2 additions & 2 deletions plugin/input/file/worker.go
Original file line number Diff line number Diff line change
@@ -15,7 +15,7 @@ type worker struct {

type inputer interface {
// In processes event and returns it seq number.
In(sourceID pipeline.SourceID, sourceName string, offset int64, data []byte, isNewSource bool) uint64
In(sourceID pipeline.SourceID, sourceName string, offset int64, data []byte, isNewSource bool, meta pipeline.MetaData) uint64
IncReadOps()
IncMaxEventSizeExceeded()
}
@@ -109,7 +109,7 @@ func (w *worker) work(controller inputer, jobProvider *jobProvider, readBufferSi
inBuf = accumBuf
}

job.lastEventSeq = controller.In(sourceID, sourceName, lastOffset+scanned, inBuf, isVirgin)
job.lastEventSeq = controller.In(sourceID, sourceName, lastOffset+scanned, inBuf, isVirgin, nil)
}
// restore the line buffer
accumBuf = accumBuf[:0]
2 changes: 1 addition & 1 deletion plugin/input/file/worker_test.go
Original file line number Diff line number Diff line change
@@ -26,7 +26,7 @@ func (i *inputerMock) IncReadOps() {}

func (i *inputerMock) IncMaxEventSizeExceeded() {}

func (i *inputerMock) In(_ pipeline.SourceID, _ string, _ int64, data []byte, _ bool) uint64 {
func (i *inputerMock) In(_ pipeline.SourceID, _ string, _ int64, data []byte, _ bool, _ pipeline.MetaData) uint64 {
i.gotData = append(i.gotData, string(data))
return 0
}
3 changes: 3 additions & 0 deletions plugin/input/http/README.md
Original file line number Diff line number Diff line change
@@ -92,6 +92,9 @@ You can use 'warn' log level for logging authorizations.

<br>

**`meta`**
<br>

**`strategy`** *`string`* *`default=disabled`* *`options=disabled|basic|bearer`*

AuthStrategy.Strategy describes strategy to use.
65 changes: 49 additions & 16 deletions plugin/input/http/http.go
Original file line number Diff line number Diff line change
@@ -2,12 +2,14 @@ package http

import (
"io"
"net"
"net/http"
"strings"
"sync"
"time"

"github.com/klauspost/compress/gzip"
"github.com/ozontech/file.d/cfg"
"github.com/ozontech/file.d/fd"
"github.com/ozontech/file.d/metric"
"github.com/ozontech/file.d/pipeline"
@@ -144,6 +146,8 @@ type Config struct {
// > See AuthConfig for details.
// > You can use 'warn' log level for logging authorizations.
Auth AuthConfig `json:"auth" child:"true"` // *

Meta cfg.MetaTemplates `json:"meta"` // *
}

type AuthStrategy byte
@@ -279,7 +283,8 @@ func (p *Plugin) putSourceID(x pipeline.SourceID) {
}

func (p *Plugin) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ok := p.auth(r)
ok, login := p.auth(r)

if !ok {
p.failedAuthTotal.Inc()
p.errorsTotal.Inc()
@@ -292,6 +297,11 @@ func (p *Plugin) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}

meta := pipeline.NewMetaRegistry(p.config.Meta)
meta.Set("login", login)
meta.Set("request", r)
meta.Set("remote_addr", getUserIP(r))

path := r.URL.Path
switch p.config.EmulateMode_ {
case EmulateModeElasticSearch:
@@ -300,7 +310,7 @@ func (p *Plugin) ServeHTTP(w http.ResponseWriter, r *http.Request) {

switch path {
case "/_bulk":
p.serveBulk(w, r)
p.serveBulk(w, r, meta)
return
case "/":
p.serveElasticsearchInfo(w, r)
@@ -337,14 +347,14 @@ func (p *Plugin) ServeHTTP(w http.ResponseWriter, r *http.Request) {
p.logger.Error("unknown elasticsearch request", zap.String("uri", r.RequestURI), zap.String("method", r.Method))
return
case EmulateModeNo:
p.serveBulk(w, r)
p.serveBulk(w, r, meta)
return
default:
panic("unreachable")
}
}

func (p *Plugin) serveBulk(w http.ResponseWriter, r *http.Request) {
func (p *Plugin) serveBulk(w http.ResponseWriter, r *http.Request, meta *pipeline.MetaRegistry) {
if r.Method != http.MethodPost {
http.Error(w, "", http.StatusMethodNotAllowed)
return
@@ -366,7 +376,7 @@ func (p *Plugin) serveBulk(w http.ResponseWriter, r *http.Request) {
reader = zr
}

if err := p.processBulk(reader); err != nil {
if err := p.processBulk(reader, meta); err != nil {
p.errorsTotal.Inc()
p.logger.Error("http input read error", zap.Error(err))
http.Error(w, "http input read error", http.StatusBadRequest)
@@ -380,7 +390,7 @@ func (p *Plugin) serveBulk(w http.ResponseWriter, r *http.Request) {
p.processBulkSeconds.Observe(time.Since(start).Seconds())
}

func (p *Plugin) processBulk(r io.Reader) error {
func (p *Plugin) processBulk(r io.Reader, meta *pipeline.MetaRegistry) error {
readBuff := p.newReadBuff()
eventBuff := p.newEventBuffs()
defer p.readBuffs.Put(&readBuff)
@@ -389,6 +399,11 @@ func (p *Plugin) processBulk(r io.Reader) error {
sourceID := p.getSourceID()
defer p.putSourceID(sourceID)

metaInfo, err := meta.GetMeta()
if err != nil {
return err
}

for {
n, err := r.Read(readBuff)
if n == 0 && err == io.EOF {
@@ -399,17 +414,17 @@ func (p *Plugin) processBulk(r io.Reader) error {
return err
}

eventBuff = p.processChunk(sourceID, readBuff[:n], eventBuff, false)
eventBuff = p.processChunk(sourceID, readBuff[:n], eventBuff, false, metaInfo)
}

if len(eventBuff) > 0 {
eventBuff = p.processChunk(sourceID, readBuff[:0], eventBuff, true)
eventBuff = p.processChunk(sourceID, readBuff[:0], eventBuff, true, metaInfo)
}

return nil
}

func (p *Plugin) processChunk(sourceID pipeline.SourceID, readBuff []byte, eventBuff []byte, isLastChunk bool) []byte {
func (p *Plugin) processChunk(sourceID pipeline.SourceID, readBuff []byte, eventBuff []byte, isLastChunk bool, meta pipeline.MetaData) []byte {
pos := 0 // current position
nlPos := 0 // new line position
for pos < len(readBuff) {
@@ -420,10 +435,10 @@ func (p *Plugin) processChunk(sourceID pipeline.SourceID, readBuff []byte, event

if len(eventBuff) != 0 {
eventBuff = append(eventBuff, readBuff[nlPos:pos]...)
_ = p.controller.In(sourceID, "http", int64(pos), eventBuff, true)
_ = p.controller.In(sourceID, "http", int64(pos), eventBuff, true, meta)
eventBuff = eventBuff[:0]
} else {
_ = p.controller.In(sourceID, "http", int64(pos), readBuff[nlPos:pos], true)
_ = p.controller.In(sourceID, "http", int64(pos), readBuff[nlPos:pos], true, meta)
}

pos++
@@ -432,7 +447,7 @@ func (p *Plugin) processChunk(sourceID pipeline.SourceID, readBuff []byte, event

if isLastChunk {
// flush buffers if we can't find the newline character
_ = p.controller.In(sourceID, "http", int64(pos), append(eventBuff, readBuff[nlPos:]...), true)
_ = p.controller.In(sourceID, "http", int64(pos), append(eventBuff, readBuff[nlPos:]...), true, meta)
eventBuff = eventBuff[:0]
} else {
eventBuff = append(eventBuff, readBuff[nlPos:]...)
@@ -453,9 +468,9 @@ func (p *Plugin) PassEvent(_ *pipeline.Event) bool {
return true
}

func (p *Plugin) auth(req *http.Request) bool {
func (p *Plugin) auth(req *http.Request) (bool, string) {
if p.config.Auth.Strategy_ == StrategyDisabled {
return true
return true, ""
}

var secretName string
@@ -469,10 +484,10 @@ func (p *Plugin) auth(req *http.Request) bool {
panic("unreachable")
}
if !ok {
return false
return false, ""
}
p.successfulAuthTotal[secretName].Inc()
return true
return true, secretName
}

func (p *Plugin) authBasic(req *http.Request) (string, bool) {
@@ -508,3 +523,21 @@ func (p *Plugin) putGzipReader(reader *gzip.Reader) {
_ = reader.Close()
p.gzipReaderPool.Put(reader)
}

func getUserIP(r *http.Request) net.IP {
var userIP string
switch {
case len(r.Header.Get("CF-Connecting-IP")) > 1:
userIP = r.Header.Get("CF-Connecting-IP")
case len(r.Header.Get("X-Forwarded-For")) > 1:
userIP = r.Header.Get("X-Forwarded-For")
case len(r.Header.Get("X-Real-IP")) > 1:
userIP = r.Header.Get("X-Real-IP")
default:
userIP = r.RemoteAddr
if strings.Contains(userIP, ":") {
return net.ParseIP(strings.Split(userIP, ":")[0])
}
}
return net.ParseIP(userIP)
}
Loading
Oops, something went wrong.

0 comments on commit db9d0cf

Please sign in to comment.