Skip to content

Commit

Permalink
Сonsider stream offset in pipeline (#683)
Browse files Browse the repository at this point in the history
* offsets interface in pipeline in

* skip already committed stream

* fix doc for fake input && linter && race on get offsets

* fix log offset of wrong format after rebase

* if stream offset not found

* dont cast offset in http input
  • Loading branch information
DmitryRomanov authored Dec 11, 2024
1 parent 75710ca commit cfe8d68
Show file tree
Hide file tree
Showing 37 changed files with 194 additions and 86 deletions.
22 changes: 17 additions & 5 deletions pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ const (
type finalizeFn = func(event *Event, notifyInput bool, backEvent bool)

type InputPluginController interface {
In(sourceID SourceID, sourceName string, offset int64, data []byte, isNewSource bool, meta metadata.MetaData) uint64
In(sourceID SourceID, sourceName string, offset Offsets, data []byte, isNewSource bool, meta metadata.MetaData) uint64
UseSpread() // don't use stream field and spread all events across all processors
DisableStreams() // don't use stream field
SuggestDecoder(t decoder.Type) // set decoder type if pipeline uses "auto" value for decoder
Expand Down Expand Up @@ -412,8 +412,13 @@ func (p *Pipeline) GetOutput() OutputPlugin {
return p.output
}

type Offsets interface {
Current() int64
ByStream(stream string) int64
}

// In decodes message and passes it to event stream.
func (p *Pipeline) In(sourceID SourceID, sourceName string, offset int64, bytes []byte, isNewSource bool, meta metadata.MetaData) (seqID uint64) {
func (p *Pipeline) In(sourceID SourceID, sourceName string, offset Offsets, bytes []byte, isNewSource bool, meta metadata.MetaData) (seqID uint64) {
// don't process mud.
var ok bool
bytes, ok = p.checkInputBytes(bytes, sourceName, meta)
Expand Down Expand Up @@ -443,7 +448,7 @@ func (p *Pipeline) In(sourceID SourceID, sourceName string, offset int64, bytes
row, err = decoder.DecodeCRI(bytes)
if err != nil {
p.wrongEventCRIFormatMetric.Inc()
p.Error(fmt.Sprintf("wrong cri format offset=%d, length=%d, err=%s, source=%d:%s, cri=%s", offset, length, err.Error(), sourceID, sourceName, bytes))
p.Error(fmt.Sprintf("wrong cri format offset=%d, length=%d, err=%s, source=%d:%s, cri=%s", offset.Current(), length, err.Error(), sourceID, sourceName, bytes))
return EventSeqIDError
}
}
Expand All @@ -457,6 +462,13 @@ func (p *Pipeline) In(sourceID SourceID, sourceName string, offset int64, bytes
// For example, for containerd this setting is called max_container_log_line_size
// https://github.com/containerd/containerd/blob/f7f2be732159a411eae46b78bfdb479b133a823b/pkg/cri/config/config.go#L263-L266
if !row.IsPartial && p.settings.AntispamThreshold > 0 {
streamOffset := offset.ByStream(string(row.Stream))
currentOffset := offset.Current()

if streamOffset > 0 && currentOffset < streamOffset {
return EventSeqIDError
}

var checkSourceID any
var checkSourceName string
if p.settings.SourceNameMetaField == "" {
Expand Down Expand Up @@ -524,7 +536,7 @@ func (p *Pipeline) In(sourceID SourceID, sourceName string, offset int64, bytes
}

p.logger.Log(level, "wrong log format", zap.Error(err),
zap.Int64("offset", offset),
zap.Int64("offset", offset.Current()),
zap.Int("length", length),
zap.Uint64("source", uint64(sourceID)),
zap.String("source_name", sourceName),
Expand Down Expand Up @@ -552,7 +564,7 @@ func (p *Pipeline) In(sourceID SourceID, sourceName string, offset int64, bytes
}
}

event.Offset = offset
event.Offset = offset.Current()
event.SourceID = sourceID
event.SourceName = sourceName
event.streamName = DefaultStreamName
Expand Down
5 changes: 3 additions & 2 deletions pipeline/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/ozontech/file.d/pipeline/metadata"
"github.com/ozontech/file.d/plugin/input/fake"
"github.com/ozontech/file.d/plugin/output/devnull"
"github.com/ozontech/file.d/test"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -66,7 +67,7 @@ func TestInInvalidMessages(t *testing.T) {

pipe.SetInput(getFakeInputInfo())

seqID := pipe.In(tCase.sourceID, "kafka", tCase.offset, tCase.message, false, nil)
seqID := pipe.In(tCase.sourceID, "kafka", test.Offset(tCase.offset), tCase.message, false, nil)
require.Equal(t, pipeline.EventSeqIDError, seqID)
})
}
Expand Down Expand Up @@ -101,7 +102,7 @@ func BenchmarkMetaTemplater(b *testing.B) {
"/k8s-logs/advanced-logs-checker-1566485760-trtrq-%d_sre-%d_duty-bot-4e0301b633eaa2bfdcafdeba59ba0c72a3815911a6a820bf273534b0f32d98e0%d.log",
rest, rest, rest,
),
int64(i),
test.Offset(i),
[]byte("2016-10-06T00:17:09.669794202Z stdout P partial content 1\n"),
false,
metadata.MetaData{},
Expand Down
4 changes: 2 additions & 2 deletions plugin/action/add_file_name/add_file_name_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ func TestModify(t *testing.T) {
wg.Done()
})

input.In(0, sourceName, 0, []byte(`{"error":"info about error"}`))
input.In(0, sourceName, 0, []byte(`{"file":"not_my_file"}`))
input.In(0, sourceName, test.Offset(0), []byte(`{"error":"info about error"}`))
input.In(0, sourceName, test.Offset(0), []byte(`{"file":"not_my_file"}`))

wg.Wait()
p.Stop()
Expand Down
2 changes: 1 addition & 1 deletion plugin/action/add_host/add_host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestModify(t *testing.T) {
wg.Done()
})

input.In(0, "test.log", 0, []byte(`{}`))
input.In(0, "test.log", test.Offset(0), []byte(`{}`))

wg.Wait()
p.Stop()
Expand Down
8 changes: 4 additions & 4 deletions plugin/action/convert_date/convert_date_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ func TestConvert(t *testing.T) {
wg.Done()
})

input.In(0, "test.log", 0, []byte(`{"time":998578502}`))
input.In(0, "test.log", 0, []byte(`{"time":998578999.1346}`))
input.In(0, "test.log", 0, []byte(`{"time":"2022/02/07 13:06:14"}`))
input.In(0, "test.log", test.Offset(0), []byte(`{"time":998578502}`))
input.In(0, "test.log", test.Offset(0), []byte(`{"time":998578999.1346}`))
input.In(0, "test.log", test.Offset(0), []byte(`{"time":"2022/02/07 13:06:14"}`))

wg.Wait()
p.Stop()
Expand All @@ -52,7 +52,7 @@ func TestConvertFail(t *testing.T) {
wg.Done()
})

input.In(0, "test.log", 0, []byte(`{"time":"XXX"}`))
input.In(0, "test.log", test.Offset(0), []byte(`{"time":"XXX"}`))

wg.Wait()
p.Stop()
Expand Down
2 changes: 1 addition & 1 deletion plugin/action/convert_log_level/convert_log_level_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func TestDo(t *testing.T) {
})

for _, log := range tc.In {
input.In(0, "test.log", 0, []byte(log))
input.In(0, "test.log", test.Offset(0), []byte(log))
}

now := time.Now()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func TestConvertUTF8Bytes(t *testing.T) {
wg.Done()
})

input.In(0, "test.log", 0, []byte(tt.in))
input.In(0, "test.log", test.Offset(0), []byte(tt.in))

wg.Wait()
p.Stop()
Expand Down
2 changes: 1 addition & 1 deletion plugin/action/decode/decode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func TestDecode(t *testing.T) {
wg.Done()
})

input.In(0, "test.log", 0, tt.input)
input.In(0, "test.log", test.Offset(0), tt.input)

wg.Wait()
p.Stop()
Expand Down
4 changes: 2 additions & 2 deletions plugin/action/discard/discard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,10 @@ func TestDiscard(t *testing.T) {
})

for _, e := range tt.passEvents {
input.In(0, "test", 0, []byte(e))
input.In(0, "test", test.Offset(0), []byte(e))
}
for _, e := range tt.discardEvents {
input.In(0, "test", 0, []byte(e))
input.In(0, "test", test.Offset(0), []byte(e))
}

wg.Wait()
Expand Down
2 changes: 1 addition & 1 deletion plugin/action/flatten/flatten_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestFlatten(t *testing.T) {
wg.Done()
})

input.In(0, "test.log", 0, []byte(`{"complex":{"a":"b","c":"d"}}`))
input.In(0, "test.log", test.Offset(0), []byte(`{"complex":{"a":"b","c":"d"}}`))

wg.Wait()
p.Stop()
Expand Down
4 changes: 2 additions & 2 deletions plugin/action/join/join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func TestSimpleJoin(t *testing.T) {

for i := 0; i < tt.iterations; i++ {
for m, line := range lines {
input.In(0, "test.log", int64(i*10000+m), []byte(line))
input.In(0, "test.log", test.Offset(int64(i*10000+m)), []byte(line))
}
}

Expand Down Expand Up @@ -271,7 +271,7 @@ func TestJoinAfterNilNode(t *testing.T) {

for i := 0; i < tt.iterations; i++ {
for m, line := range lines {
input.In(0, "test.log", int64(i*10000+m), []byte(line))
input.In(0, "test.log", test.Offset(int64(i*10000+m)), []byte(line))
}
}

Expand Down
4 changes: 2 additions & 2 deletions plugin/action/join_template/join_template_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ func TestSimpleJoin(t *testing.T) {

for i := 0; i < tt.iterations; i++ {
for m, line := range lines {
input.In(0, "test.log", int64(i*10000+m), []byte(line))
input.In(0, "test.log", test.Offset(int64(i*10000+m)), []byte(line))
}
}

Expand Down Expand Up @@ -534,7 +534,7 @@ func TestJoinAfterNilNode(t *testing.T) {

for i := 0; i < tt.iterations; i++ {
for m, line := range lines {
input.In(0, "test.log", int64(i*10000+m), []byte(line))
input.In(0, "test.log", test.Offset(int64(i*10000+m)), []byte(line))
}
}

Expand Down
2 changes: 1 addition & 1 deletion plugin/action/json_decode/json_decode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestDecode(t *testing.T) {
wg.Done()
})

input.In(0, "test.log", 0, []byte(`{"log":"{\"field2\":\"value2\",\"field3\":\"value3\"}"}`))
input.In(0, "test.log", test.Offset(0), []byte(`{"log":"{\"field2\":\"value2\",\"field3\":\"value3\"}"}`))

wg.Wait()
p.Stop()
Expand Down
2 changes: 1 addition & 1 deletion plugin/action/json_encode/json_encode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestEncode(t *testing.T) {
wg.Done()
})

input.In(0, "test.log", 0, []byte(`{"server":{"os":"linux","arch":"amd64"}}`))
input.In(0, "test.log", test.Offset(0), []byte(`{"server":{"os":"linux","arch":"amd64"}}`))

wg.Wait()
p.Stop()
Expand Down
2 changes: 1 addition & 1 deletion plugin/action/json_extract/json_extract_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func TestJsonExtract(t *testing.T) {
wg.Done()
})

input.In(0, "test.log", 0, []byte(tt.in))
input.In(0, "test.log", test.Offset(0), []byte(tt.in))

wg.Wait()
p.Stop()
Expand Down
6 changes: 3 additions & 3 deletions plugin/action/keep_fields/keep_fields_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ func TestKeepFields(t *testing.T) {
wg.Done()
})

input.In(0, "test.log", 0, []byte(`{"field_1":"value_1","a":"b"}`))
input.In(0, "test.log", 0, []byte(`{"field_2":"value_2","b":"c"}`))
input.In(0, "test.log", 0, []byte(`{"field_3":"value_3","a":"b"}`))
input.In(0, "test.log", test.Offset(0), []byte(`{"field_1":"value_1","a":"b"}`))
input.In(0, "test.log", test.Offset(0), []byte(`{"field_2":"value_2","b":"c"}`))
input.In(0, "test.log", test.Offset(0), []byte(`{"field_3":"value_3","a":"b"}`))

wg.Wait()
p.Stop()
Expand Down
6 changes: 3 additions & 3 deletions plugin/action/mask/mask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -826,7 +826,7 @@ func TestPlugin(t *testing.T) {
})

for _, in := range s.input {
input.In(0, "test.log", 0, []byte(in))
input.In(0, "test.log", test.Offset(0), []byte(in))
}

wg.Wait()
Expand Down Expand Up @@ -899,7 +899,7 @@ func TestWithEmptyRegex(t *testing.T) {
})

for _, in := range s.input {
input.In(0, "test.log", 0, []byte(in))
input.In(0, "test.log", test.Offset(0), []byte(in))
}

wg.Wait()
Expand Down Expand Up @@ -1090,7 +1090,7 @@ func TestPluginWithComplexMasks(t *testing.T) {
})

for _, in := range s.input {
input.In(0, "test.log", 0, []byte(in))
input.In(0, "test.log", test.Offset(0), []byte(in))
}

wg.Wait()
Expand Down
6 changes: 3 additions & 3 deletions plugin/action/modify/modify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestModify(t *testing.T) {
wg.Done()
})

input.In(0, "test.log", 0, []byte(`{"existing_field":"existing_value","my_object":{"field":{"subfield":"subfield_value"}}}`))
input.In(0, "test.log", test.Offset(0), []byte(`{"existing_field":"existing_value","my_object":{"field":{"subfield":"subfield_value"}}}`))

wg.Wait()
p.Stop()
Expand Down Expand Up @@ -79,7 +79,7 @@ func TestModifyRegex(t *testing.T) {
wg.Add(len(testEvents))

for _, te := range testEvents {
input.In(0, "test.log", 0, te.in)
input.In(0, "test.log", test.Offset(0), te.in)
}

wg.Wait()
Expand Down Expand Up @@ -136,7 +136,7 @@ func TestModifyTrim(t *testing.T) {
wg.Add(len(testEvents))

for _, te := range testEvents {
input.In(0, "test.log", 0, te.in)
input.In(0, "test.log", test.Offset(0), te.in)
}

wg.Wait()
Expand Down
2 changes: 1 addition & 1 deletion plugin/action/move/move_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func TestMove(t *testing.T) {
wg.Done()
})

input.In(0, "test.log", 0, []byte(tt.in))
input.In(0, "test.log", test.Offset(0), []byte(tt.in))

wg.Wait()
p.Stop()
Expand Down
2 changes: 1 addition & 1 deletion plugin/action/parse_es/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func TestPipeline(t *testing.T) {
})

for _, event := range tCase.eventsIn {
input.In(event.sourceID, event.sourceName, event.offset, event.bytes)
input.In(event.sourceID, event.sourceName, test.Offset(event.offset), event.bytes)
}

wg.Wait()
Expand Down
6 changes: 3 additions & 3 deletions plugin/action/parse_re2/parse_re2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestDecode(t *testing.T) {
wg.Done()
})

input.In(0, "test.log", 0, []byte(`{"log":"2021-06-22 16:24:27 GMT [7291] => [2-1] client=test_client,db=test_db,user=test_user LOG: listening on IPv4 address \"0.0.0.0\", port 5432"}`))
input.In(0, "test.log", test.Offset(0), []byte(`{"log":"2021-06-22 16:24:27 GMT [7291] => [2-1] client=test_client,db=test_db,user=test_user LOG: listening on IPv4 address \"0.0.0.0\", port 5432"}`))

wg.Wait()
p.Stop()
Expand All @@ -46,8 +46,8 @@ func TestDecodeAccessLogsJira(t *testing.T) {
wg.Done()
})

input.In(0, "test.log", 0, []byte(`{"message":"10.115.195.13 0x51320775x2 jira_robot [07/Nov/2022:00:00:00 +0300] \"GET /rest/api/2/issue/FRAUD-3847?fields=resolution HTTP/1.1\" 200 198 20 \"https://jit.o3.ru/secure/RapidBoard.jspa?rapidView=2701&selectedIssue=EXPC-3767&quickFilter=16465&quickFilter=15365\" \"Apache-HttpClient/4.5.13 (Java/11.0.9)\" \"nj56zg\""}`))
input.In(0, "test.log", 0, []byte(`{"message":"10.115.195.12 0x51320774x2 ezabelin [07/Nov/2022:00:00:00 +0300] \"GET /rest/api/2/issue/RP-4977?fields=resolution HTTP/1.1\" 201 158 15 \"-\" \"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/106.0.0.0 Safari/537.36\" \"1tmznt9\""}`))
input.In(0, "test.log", test.Offset(0), []byte(`{"message":"10.115.195.13 0x51320775x2 jira_robot [07/Nov/2022:00:00:00 +0300] \"GET /rest/api/2/issue/FRAUD-3847?fields=resolution HTTP/1.1\" 200 198 20 \"https://jit.o3.ru/secure/RapidBoard.jspa?rapidView=2701&selectedIssue=EXPC-3767&quickFilter=16465&quickFilter=15365\" \"Apache-HttpClient/4.5.13 (Java/11.0.9)\" \"nj56zg\""}`))
input.In(0, "test.log", test.Offset(0), []byte(`{"message":"10.115.195.12 0x51320774x2 ezabelin [07/Nov/2022:00:00:00 +0300] \"GET /rest/api/2/issue/RP-4977?fields=resolution HTTP/1.1\" 201 158 15 \"-\" \"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/106.0.0.0 Safari/537.36\" \"1tmznt9\""}`))

wg.Wait()
p.Stop()
Expand Down
12 changes: 6 additions & 6 deletions plugin/action/remove_fields/remove_fields_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ func TestRemoveFields(t *testing.T) {
wg.Done()
})

input.In(0, "test.log", 0, []byte(`{"field_1":"value_1","a":"b"}`))
input.In(0, "test.log", 0, []byte(`{"field_2":"value_2","b":"c"}`))
input.In(0, "test.log", 0, []byte(`{"field_3":"value_3","a":"b"}`))
input.In(0, "test.log", test.Offset(0), []byte(`{"field_1":"value_1","a":"b"}`))
input.In(0, "test.log", test.Offset(0), []byte(`{"field_2":"value_2","b":"c"}`))
input.In(0, "test.log", test.Offset(0), []byte(`{"field_3":"value_3","a":"b"}`))

wg.Wait()
p.Stop()
Expand All @@ -47,9 +47,9 @@ func TestRemoveNestedFields(t *testing.T) {
wg.Done()
})

input.In(0, "test.log", 0, []byte(`{"a":"some"}`))
input.In(0, "test.log", 0, []byte(`{"a":{"b":"deleted"}}`))
input.In(0, "test.log", 0, []byte(`{"a":{"b":{"c":["deleted"]},"d":"saved"}}`))
input.In(0, "test.log", test.Offset(0), []byte(`{"a":"some"}`))
input.In(0, "test.log", test.Offset(0), []byte(`{"a":{"b":"deleted"}}`))
input.In(0, "test.log", test.Offset(0), []byte(`{"a":{"b":{"c":["deleted"]},"d":"saved"}}`))

wg.Wait()
p.Stop()
Expand Down
Loading

0 comments on commit cfe8d68

Please sign in to comment.