Skip to content

Commit

Permalink
code review fixes, s3 and throttle test fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Gleb Zakharov committed Nov 29, 2021
1 parent bf307fc commit 2e55c19
Show file tree
Hide file tree
Showing 23 changed files with 102 additions and 103 deletions.
12 changes: 6 additions & 6 deletions fd/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (
func extractPipelineParams(settings *simplejson.Json) *pipeline.Settings {
capacity := pipeline.DefaultCapacity
antispamThreshold := 0
avgLogSize := pipeline.DefaultAvgLogSize
maxLogSize := pipeline.DefaultMaxLogSize
avgInputEventSize := pipeline.DefaultAvgInputEventSize
maxInputEventSize := pipeline.DefaultMaxInputEventSize
streamField := pipeline.DefaultStreamField
maintenanceInterval := pipeline.DefaultMaintenanceInterval
decoder := "auto"
Expand All @@ -29,12 +29,12 @@ func extractPipelineParams(settings *simplejson.Json) *pipeline.Settings {

val = settings.Get("avg_log_size").MustInt()
if val != 0 {
avgLogSize = val
avgInputEventSize = val
}

val = settings.Get("max_log_size").MustInt()
if val != 0 {
maxLogSize = val
maxInputEventSize = val
}

str := settings.Get("decoder").MustString()
Expand Down Expand Up @@ -74,8 +74,8 @@ func extractPipelineParams(settings *simplejson.Json) *pipeline.Settings {
return &pipeline.Settings{
Decoder: decoder,
Capacity: capacity,
AvgLogSize: avgLogSize,
MaxLogSize: maxLogSize,
AvgEventSize: avgInputEventSize,
MaxEventSize: maxInputEventSize,
AntispamThreshold: antispamThreshold,
MaintenanceInterval: maintenanceInterval,
EventTimeout: eventTimeout,
Expand Down
10 changes: 5 additions & 5 deletions pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import (
const (
DefaultStreamField = "stream"
DefaultCapacity = 1024
DefaultAvgLogSize = 16 * 1024
DefaultMaxLogSize = 0
DefaultAvgInputEventSize = 16 * 1024
DefaultMaxInputEventSize = 0
DefaultJSONNodePoolSize = 1024
DefaultMaintenanceInterval = time.Second * 5
DefaultEventTimeout = time.Second * 30
Expand Down Expand Up @@ -107,8 +107,8 @@ type Settings struct {
MaintenanceInterval time.Duration
EventTimeout time.Duration
AntispamThreshold int
AvgLogSize int
MaxLogSize int
AvgEventSize int
MaxEventSize int
StreamField string
IsStrict bool
}
Expand Down Expand Up @@ -267,7 +267,7 @@ func (p *Pipeline) In(sourceID SourceID, sourceName string, offset int64, bytes
// don't process shit
isEmpty := length == 0 || (bytes[0] == '\n' && length == 1)
isSpam := p.antispamer.isSpam(sourceID, sourceName, isNewSource)
isLong := p.settings.MaxLogSize != 0 && length > p.settings.MaxLogSize
isLong := p.settings.MaxEventSize != 0 && length > p.settings.MaxEventSize
if isEmpty || isSpam || isLong {
return 0
}
Expand Down
2 changes: 1 addition & 1 deletion pipeline/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (s *stream) tryUnblock() bool {
}

s.mu.Lock()
if time.Now().Sub(s.blockTime) < s.streamer.eventTimeout {
if time.Since(s.blockTime) < s.streamer.eventTimeout {
s.mu.Unlock()
return false
}
Expand Down
20 changes: 12 additions & 8 deletions plugin/action/join/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,10 @@ type Plugin struct {
controller pipeline.ActionPluginController
config *Config

isJoining bool
initial *pipeline.Event
buff []byte
maxLogSize int
isJoining bool
initial *pipeline.Event
buff []byte
maxEventSize int

logger *zap.SugaredLogger
}
Expand All @@ -94,6 +94,11 @@ type Config struct {
//> A regexp which will continue the join sequence.
Continue cfg.Regexp `json:"continue" required:"true" parse:"regexp"` //*
Continue_ *regexp.Regexp

//> @3@4@5@6
//>
//> Max size of the resulted event. If it is set and the event exceeds the limit, the event will be truncated.
MaxEventSize int `json:"max_event_size" default:"0"` //*
}

func init() {
Expand All @@ -111,8 +116,8 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.ActionPluginP
p.controller = params.Controller
p.config = config.(*Config)
p.isJoining = false
p.buff = make([]byte, 0, params.PipelineSettings.AvgLogSize)
p.maxLogSize = params.PipelineSettings.MaxLogSize
p.buff = make([]byte, 0, params.PipelineSettings.AvgEventSize)
p.maxEventSize = p.config.MaxEventSize
p.logger = params.Logger
}

Expand Down Expand Up @@ -164,8 +169,7 @@ func (p *Plugin) Do(event *pipeline.Event) pipeline.ActionResult {
if p.isJoining {
nextOK := p.config.Continue_.MatchString(value)
if nextOK {
// append buff until it exceeds max_log_size.
if p.maxLogSize == 0 || len(p.buff) < p.maxLogSize {
if p.maxEventSize == 0 || len(p.buff) < p.maxEventSize {
p.buff = append(p.buff, value...)
}
return pipeline.ActionCollapse
Expand Down
4 changes: 2 additions & 2 deletions plugin/action/join/join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ const contentPostgres = `# ===next===
2021-10-12 08:25:44 GMT [23379] => [526-1] client=[local],db=exampledb,user=none LOG: duration: 0.018 ms execute <unnamed>: SHOW TRANSACTION ISOLATION LEVEL
`

func TestJoin(t *testing.T) {
func TestSimpleJoin(t *testing.T) {
cases := []struct {
name string
startPat string
Expand Down Expand Up @@ -215,7 +215,7 @@ func TestJoin(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
format := `{"log":"%s\n"}`
content := strings.ReplaceAll(tt.content, "# ===next===\n", "")
lines := make([]string, 0, 0)
lines := make([]string, 0)
for _, line := range strings.Split(content, "\n") {
if line == "" {
continue
Expand Down
4 changes: 2 additions & 2 deletions plugin/action/mask/mask.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ func verifyGroupNumbers(groups []int, totalGroups int, logger *zap.SugaredLogger

func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.ActionPluginParams) {
p.config = config.(*Config)
p.maskBuf = make([]byte, 0, params.PipelineSettings.AvgLogSize)
p.sourceBuf = make([]byte, 0, params.PipelineSettings.AvgLogSize)
p.maskBuf = make([]byte, 0, params.PipelineSettings.AvgEventSize)
p.sourceBuf = make([]byte, 0, params.PipelineSettings.AvgEventSize)
p.valueNodes = make([]*insaneJSON.Node, 0)
p.logger = params.Logger
p.config.Masks = compileMasks(p.config.Masks, p.logger)
Expand Down
3 changes: 2 additions & 1 deletion plugin/action/throttle/throttle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ func (c *testConfig) runPipeline(t *testing.T) {
startTime := time.Now()
for {
index := rand.Int() % len(formats)
json := fmt.Sprintf(formats[index], time.Now().UTC().Format(time.RFC3339Nano))
// Format like RFC3339Nano, but nanoseconds are zero-padded, thus all times have equal length.
json := fmt.Sprintf(formats[index], time.Now().UTC().Format("2006-01-02T15:04:05.000000000Z07:00"))
input.In(10, sourceNames[rand.Int()%len(sourceNames)], 0, []byte(json))
if time.Since(startTime) > c.workTime {
break
Expand Down
2 changes: 1 addition & 1 deletion plugin/input/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func (p *Plugin) startWorkers() {
p.workers = make([]*worker, p.config.WorkersCount_)
for i := range p.workers {
p.workers[i] = &worker{
maxLogSize: p.params.PipelineSettings.MaxLogSize,
maxEventSize: p.params.PipelineSettings.MaxEventSize,
}
p.workers[i].start(p.params.Controller, p.jobProvider, p.config.ReadBufferSize, p.logger)
}
Expand Down
8 changes: 4 additions & 4 deletions plugin/input/file/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

type worker struct {
maxLogSize int
maxEventSize int
}

type inputer interface {
Expand All @@ -25,7 +25,7 @@ func (w *worker) work(controller inputer, jobProvider *jobProvider, readBufferSi
accumBuffer := make([]byte, 0, readBufferSize)
readBuffer := make([]byte, readBufferSize)
var inBuffer []byte
shouldCheckMax := w.maxLogSize != 0
shouldCheckMax := w.maxEventSize != 0

var seqID uint64 = 0
for {
Expand Down Expand Up @@ -102,7 +102,7 @@ func (w *worker) work(controller inputer, jobProvider *jobProvider, readBufferSi
} else {
inBuffer = readBuffer[processed : pos+1]
}
if shouldCheckMax && len(inBuffer) > w.maxLogSize {
if shouldCheckMax && len(inBuffer) > w.maxEventSize {
break
}
seqID = controller.In(sourceID, sourceName, offset, inBuffer, isVirgin)
Expand All @@ -121,7 +121,7 @@ func (w *worker) work(controller inputer, jobProvider *jobProvider, readBufferSi
} else {
accumBuffer = append(accumBuffer, readBuffer[:read]...)
accumulated += read
if shouldCheckMax && len(accumBuffer) > w.maxLogSize {
if shouldCheckMax && len(accumBuffer) > w.maxEventSize {
break
}
}
Expand Down
14 changes: 5 additions & 9 deletions plugin/input/file/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,42 +25,38 @@ func (i *inputerMock) In(sourceID pipeline.SourceID, sourceName string, offset i
}

func TestWorkerWork(t *testing.T) {
type args struct {
file string
readBufferSize int
}
tests := []struct {
name string
maxLogSize int
maxEventSize int
inFile string
readBufferSize int
expData string
}{
{
name: "should_ok_when_read_1_line",
maxLogSize: 1024,
maxEventSize: 1024,
inFile: "abc\n",
readBufferSize: 1024,
expData: "abc\n",
},
{
name: "should_ok_and_empty_when_read_not_ready_line",
maxLogSize: 1024,
maxEventSize: 1024,
inFile: "abc",
readBufferSize: 1024,
expData: "",
},
{
name: "should_ok_and_not_read_long_line",
maxLogSize: 2,
maxEventSize: 2,
inFile: "abc\n",
readBufferSize: 1024,
expData: "",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
w := &worker{maxLogSize: tt.maxLogSize}
w := &worker{maxEventSize: tt.maxEventSize}
inputer := inputerMock{}
f, err := os.CreateTemp("/tmp", "worker_test")
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion plugin/input/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (p *Plugin) newReadBuff() interface{} {
}

func (p *Plugin) newEventBuffs() interface{} {
return make([]byte, 0, p.params.PipelineSettings.AvgLogSize)
return make([]byte, 0, p.params.PipelineSettings.AvgEventSize)
}

func (p *Plugin) getSourceID() pipeline.SourceID {
Expand Down
18 changes: 9 additions & 9 deletions plugin/input/k8s/multiline_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ import (
)

type MultilineAction struct {
config *Config
logger *zap.SugaredLogger
params *pipeline.ActionPluginParams
maxLogSize int
logBuff []byte
logSize int
config *Config
logger *zap.SugaredLogger
params *pipeline.ActionPluginParams
maxEventSize int
logBuff []byte
logSize int
}

const (
Expand All @@ -22,7 +22,7 @@ const (
func (p *MultilineAction) Start(config pipeline.AnyConfig, params *pipeline.ActionPluginParams) {
p.logger = params.Logger
p.params = params
p.maxLogSize = params.PipelineSettings.MaxLogSize
p.maxEventSize = params.PipelineSettings.MaxEventSize
p.config = config.(*Config)

p.config.AllowedPodLabels_ = cfg.ListToMap(p.config.AllowedPodLabels)
Expand All @@ -41,8 +41,8 @@ func (p *MultilineAction) Do(event *pipeline.Event) pipeline.ActionResult {
p.logBuff = p.logBuff[:1]
return pipeline.ActionDiscard
}
if p.maxLogSize != 0 && p.logSize > p.maxLogSize {
p.logger.Errorf("logs will be discarded due to maxLogSize")
if p.maxEventSize != 0 && p.logSize > p.maxEventSize {
p.logger.Errorf("logs will be discarded due to maxEventSize")
p.logBuff = p.logBuff[:1]
return pipeline.ActionDiscard
}
Expand Down
24 changes: 12 additions & 12 deletions plugin/output/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ If a network error occurs, the batch will infinitely try to be delivered to the
}*/

type Plugin struct {
logger *zap.SugaredLogger
client *http.Client
config *Config
avgLogSize int
time string
batcher *pipeline.Batcher
controller pipeline.OutputPluginController
mu *sync.Mutex
logger *zap.SugaredLogger
client *http.Client
config *Config
avgEventSize int
time string
batcher *pipeline.Batcher
controller pipeline.OutputPluginController
mu *sync.Mutex
}

//! config-params
Expand Down Expand Up @@ -104,7 +104,7 @@ func Factory() (pipeline.AnyPlugin, pipeline.AnyConfig) {
func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginParams) {
p.controller = params.Controller
p.logger = params.Logger
p.avgLogSize = params.PipelineSettings.AvgLogSize
p.avgEventSize = params.PipelineSettings.AvgEventSize
p.config = config.(*Config)
p.mu = &sync.Mutex{}

Expand Down Expand Up @@ -150,14 +150,14 @@ func (p *Plugin) Out(event *pipeline.Event) {
func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {
if *workerData == nil {
*workerData = &data{
outBuf: make([]byte, 0, p.config.BatchSize_*p.avgLogSize),
outBuf: make([]byte, 0, p.config.BatchSize_*p.avgEventSize),
}
}

data := (*workerData).(*data)
// handle to much memory consumption
if cap(data.outBuf) > p.config.BatchSize_*p.avgLogSize {
data.outBuf = make([]byte, 0, p.config.BatchSize_*p.avgLogSize)
if cap(data.outBuf) > p.config.BatchSize_*p.avgEventSize {
data.outBuf = make([]byte, 0, p.config.BatchSize_*p.avgEventSize)
}

data.outBuf = data.outBuf[:0]
Expand Down
8 changes: 4 additions & 4 deletions plugin/output/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type Plugin struct {
controller pipeline.OutputPluginController
logger *zap.SugaredLogger
config *Config
avgLogSize int
avgEventSize int
batcher *pipeline.Batcher
file *os.File
ctx context.Context
Expand Down Expand Up @@ -145,14 +145,14 @@ func (p *Plugin) Out(event *pipeline.Event) {
func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {
if *workerData == nil {
*workerData = &data{
outBuf: make([]byte, 0, p.config.BatchSize_*p.avgLogSize),
outBuf: make([]byte, 0, p.config.BatchSize_*p.avgEventSize),
}
}
data := (*workerData).(*data)

// handle to much memory consumption
if cap(data.outBuf) > p.config.BatchSize_*p.avgLogSize {
data.outBuf = make([]byte, 0, p.config.BatchSize_*p.avgLogSize)
if cap(data.outBuf) > p.config.BatchSize_*p.avgEventSize {
data.outBuf = make([]byte, 0, p.config.BatchSize_*p.avgEventSize)
}

outBuf := data.outBuf[:0]
Expand Down
2 changes: 1 addition & 1 deletion plugin/output/file/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func newPipeline(t *testing.T, configOutput *Config) *pipeline.Pipeline {
Capacity: 4096,
MaintenanceInterval: time.Second * 100000,
AntispamThreshold: 0,
AvgLogSize: 2048,
AvgEventSize: 2048,
StreamField: "stream",
Decoder: "json",
}
Expand Down
Loading

0 comments on commit 2e55c19

Please sign in to comment.