Skip to content

Commit

Permalink
Reset busy action after Hold (#279)
Browse files Browse the repository at this point in the history
Reset busy action after Hold
  • Loading branch information
vadimalekseev authored Feb 1, 2023
1 parent 20d1678 commit 801d246
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 9 deletions.
26 changes: 26 additions & 0 deletions e2e/join_throttle/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
pipelines:
join_throttle:
settings:
event_timeout: 1s
input:
type: file
persistence_mode: async
watching_dir: SOME_DIR
offsets_file: SOME_FILE
offsets_op: reset
actions:
- type: join
field: message
start: '/^start/'
continue: '/^continue/'
- type: modify
ts: '2009-11-10T23:00:00Z'
- type: throttle
bucket_interval: 1m
buckets_count: 60
default_limit: 100
throttle_field: service
time_field: ts
output:
type: file
target_file: SOME_FILE
61 changes: 61 additions & 0 deletions e2e/join_throttle/join_throttle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package join_throttle

import (
"os"
"path"
"path/filepath"
"testing"
"time"

"github.com/ozontech/file.d/cfg"
"github.com/ozontech/file.d/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

type Config struct {
inputDir string
outputDir string
Count int
}

func (c *Config) Configure(t *testing.T, conf *cfg.Config, pipelineName string) {
c.inputDir = t.TempDir()
c.outputDir = t.TempDir()
offsetsDir := t.TempDir()

input := conf.Pipelines[pipelineName].Raw.Get("input")
input.Set("watching_dir", c.inputDir)
input.Set("filename_pattern", "input.log")
input.Set("offsets_file", filepath.Join(offsetsDir, "offsets.yaml"))

output := conf.Pipelines[pipelineName].Raw.Get("output")
output.Set("target_file", path.Join(c.outputDir, "output.log"))
}

func (c *Config) Send(t *testing.T) {
file, err := os.Create(path.Join(c.inputDir, "input.log"))
require.NoError(t, err)
defer file.Close()

for i := 0; i < c.Count; i++ {
_, err = file.WriteString(`{"message":"start "}` + "\n")
_ = file.Sync()
require.NoError(t, err)
}
}

func (c *Config) Validate(t *testing.T) {
logFilePattern := path.Join(c.outputDir, "*")

expectedEvents := 100 // because we are set default_limit: 100 in the throttle plugin

test.WaitProcessEvents(t, expectedEvents, 3*time.Second, 50*time.Second, logFilePattern)
matches := test.GetMatches(t, logFilePattern)
assert.True(t, len(matches) > 0, "no files with processed events")

got := test.CountLines(t, logFilePattern)

throttleAccuracy := got >= expectedEvents && got <= expectedEvents*2 // we don't know how many events we will get
assert.True(t, throttleAccuracy)
}
8 changes: 8 additions & 0 deletions e2e/start_work_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/ozontech/file.d/cfg"
"github.com/ozontech/file.d/e2e/file_file"
"github.com/ozontech/file.d/e2e/http_file"
"github.com/ozontech/file.d/e2e/join_throttle"
"github.com/ozontech/file.d/e2e/kafka_file"
"github.com/ozontech/file.d/fd"
_ "github.com/ozontech/file.d/plugin/action/add_host"
Expand Down Expand Up @@ -99,6 +100,13 @@ func TestE2EStabilityWorkCase(t *testing.T) {
},
cfgPath: "./kafka_file/config.yml",
},
{
name: "join_throttle",
e2eTest: &join_throttle.Config{
Count: 1000,
},
cfgPath: "./join_throttle/config.yml",
},
}

for num, test := range testsList {
Expand Down
4 changes: 2 additions & 2 deletions pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ type Pipeline struct {
readOps atomic.Int64
maxSize int

//all pipeline`s metrics
// all pipeline`s metrics

inUseEventsMetric *prometheus.GaugeVec
eventPoolCapacityMetric *prometheus.GaugeVec
Expand Down Expand Up @@ -516,7 +516,7 @@ func (p *Pipeline) initProcs() {
}

func (p *Pipeline) newProc() *processor {
proc := NewProcessor(
proc := newProcessor(
p.metricsHolder,
p.activeProcs,
p.output,
Expand Down
6 changes: 4 additions & 2 deletions pipeline/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ type processor struct {

var id = 0

func NewProcessor(
func newProcessor(
metricsHolder *metricsHolder,
activeCounter *atomic.Int32,
output OutputPlugin,
Expand Down Expand Up @@ -347,8 +347,10 @@ func (p *processor) Commit(event *Event) {
p.finalize(event, false, true)
}

// Propagate flushes an event after ActionHold.
func (p *processor) Propagate(event *Event) {
event.action.Inc()
nextActionIdx := event.action.Inc()
p.tryResetBusy(int(nextActionIdx - 1))
p.processSequence(event)
}

Expand Down
11 changes: 6 additions & 5 deletions test/file_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ package test

import (
"bufio"
"log"
"os"
"path/filepath"
"testing"
"time"

"github.com/ozontech/file.d/pipeline"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

// The file_base contains helpers function for testing file base output plugins
Expand Down Expand Up @@ -68,21 +68,22 @@ func CountLines(t *testing.T, pattern string) int {
lineCount := 0
for _, match := range matches {
file, err := os.Open(match)
if err != nil {
log.Fatalf("can't open file: %s", err.Error())
}
require.NoError(t, err)

fileScanner := bufio.NewScanner(file)
for fileScanner.Scan() {
lineCount++
}

require.NoError(t, file.Close())
}
return lineCount
}

func WaitProcessEvents(t *testing.T, count int, checkInterval, maxTime time.Duration, pattern string) {
tf := time.Now().Add(maxTime)
for tf.After(time.Now()) {
if count == CountLines(t, pattern) {
if count <= CountLines(t, pattern) {
return
}
time.Sleep(checkInterval)
Expand Down

0 comments on commit 801d246

Please sign in to comment.