Skip to content

Commit

Permalink
Fix tests for working pipeline
Browse files Browse the repository at this point in the history
- Fix tests for file input and output plugin
- fix throttle, s3
- make input file plugin work with truncation and in low CPU mode
  • Loading branch information
Gleb Zakharov committed Nov 1, 2021
1 parent 1528ba6 commit bd5f168
Show file tree
Hide file tree
Showing 17 changed files with 153 additions and 110 deletions.
6 changes: 2 additions & 4 deletions cmd/file.d_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//go:build e2e

package main

import (
Expand Down Expand Up @@ -66,10 +68,6 @@ const testTime = 10 * time.Minute
// It's something like fuzz testing. file.d shouldn't crash/panic or hang for infinite time.
// E.g. keep this test running while you are sleeping :)
func TestEndToEnd(t *testing.T) {
if testing.Short() {
t.Skip("skipping testing in short mode")
}

configFilename := "./../testdata/config/e2e.yaml"
iterationInterval := time.Second * 10
writerCount := 8
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ require (
github.com/eapache/go-resiliency v1.2.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/fsnotify/fsnotify v1.5.1 // indirect
github.com/gogo/protobuf v1.3.1 // indirect
github.com/golang/protobuf v1.4.2 // indirect
github.com/golang/snappy v0.0.3 // indirect
Expand Down Expand Up @@ -85,7 +86,7 @@ require (
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee // indirect
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e // indirect
golang.org/x/lint v0.0.0-20190930215403-16217165b5de // indirect
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1 // indirect
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c // indirect
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 // indirect
golang.org/x/text v0.3.6 // indirect
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ github.com/frankban/quicktest v1.10.0/go.mod h1:ui7WezCLWMWxVWr1GETZY3smRy0G4KWq
github.com/frankban/quicktest v1.11.3 h1:8sXhOn0uLys67V8EsXLc6eszDs8VXWxL3iRvebPhedY=
github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.5.1 h1:mZcQUHVQUQWoPXXtuf9yuEXKudkV2sx1E06UadKWpgI=
github.com/fsnotify/fsnotify v1.5.1/go.mod h1:T3375wBYaZdLLcVNkcVbzGHY7f1l/uK5T5Ai1i3InKU=
github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/go-asn1-ber/asn1-ber v1.3.1/go.mod h1:hEBeB/ic+5LoWskz+yKT7vGhhPYkProFKoKdwZRWMe0=
Expand Down Expand Up @@ -437,6 +439,8 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1 h1:SrN+KX8Art/Sf4HNj6Zcz06G7VEz+7w9tdXTPOZ7+l4=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c h1:F1jZWGFhYfh0Ci55sIpILtKKK8p3i2/krTr0H1rg74I=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 h1:v+OssWQX+hTHEmOBgwxdZxK4zHq3yOs8F9J7mk0PY8E=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand Down
2 changes: 0 additions & 2 deletions pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,6 @@ func (p *Pipeline) In(sourceID SourceID, sourceName string, offset int64, bytes
}

event := p.eventPool.get()

dec := decoder.NO
if p.decoder == decoder.AUTO {
dec = p.suggestedDecoder
Expand Down Expand Up @@ -329,7 +328,6 @@ func (p *Pipeline) streamEvent(event *Event) uint64 {
if p.useSpread {
event.SourceID = SourceID(event.SeqID % uint64(p.procCount.Load()))
}

if !p.disableStreams {
node := event.Root.Dig(p.settings.StreamField)
if node != nil {
Expand Down
35 changes: 22 additions & 13 deletions plugin/action/throttle/throttle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package throttle
import (
"fmt"
"math/rand"
"sync"
"testing"
"time"

Expand All @@ -12,6 +11,8 @@ import (
"github.com/ozonru/file.d/pipeline"
"github.com/ozonru/file.d/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
)

type testConfig struct {
Expand All @@ -27,15 +28,14 @@ var formats = []string{
`{"time":"%s","k8s_ns":"not_matched","k8s_pod":"pod_3"}`,
}

func (c *testConfig) runPipeline() {
func (c *testConfig) runPipeline(t *testing.T) {
p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, c.config, pipeline.MatchModeAnd, nil, false))
wg := &sync.WaitGroup{}
wg.Add(c.eventsTotal)
wgWithDeadline := atomic.NewInt32(int32(c.eventsTotal))

outEvents := make([]*pipeline.Event, 0)
output.SetOutFn(func(e *pipeline.Event) {
outEvents = append(outEvents, e)
wg.Done()
wgWithDeadline.Dec()
})

sourceNames := []string{
Expand All @@ -47,16 +47,24 @@ func (c *testConfig) runPipeline() {
startTime := time.Now()
for {
index := rand.Int() % len(formats)
json := fmt.Sprintf(formats[index], time.Now().Format(time.RFC3339Nano))

json := fmt.Sprintf(formats[index], time.Now().UTC().Format(time.RFC3339Nano))
input.In(10, sourceNames[rand.Int()%len(sourceNames)], 0, []byte(json))
if time.Since(startTime) > c.workTime {
break
}
}

p.Stop()
wg.Wait()
tnow := time.Now()
for {
if time.Since(tnow) > 10*time.Second {
require.FailNow(t, "too long act")
}
if wgWithDeadline.Load() <= 0 {
break
}
time.Sleep(10 * time.Millisecond)
}

assert.Equal(c.t, c.eventsTotal, len(outEvents), "wrong in events count")
}
Expand Down Expand Up @@ -92,7 +100,7 @@ func TestThrottle(t *testing.T) {
workTime := config.BucketInterval_ * time.Duration(iterations)

tconf := testConfig{t, config, eventsTotal, workTime}
tconf.runPipeline()
tconf.runPipeline(t)
}

func TestSizeThrottle(t *testing.T) {
Expand All @@ -106,7 +114,8 @@ func TestSizeThrottle(t *testing.T) {
iterations := 5

totalBuckets := iterations + 1
eventsTotal := totalBuckets * (limitA/(len(formats[0])+dateLen) + limitB/(len(formats[1])+dateLen) + defaultLimit/(len(formats[2])+dateLen))
test := limitA/(len(formats[0])+dateLen-2) + limitB/(len(formats[1])+dateLen-2) + defaultLimit/(len(formats[2])+dateLen-2)
eventsTotal := totalBuckets * test

config := &Config{
Rules: []RuleConfig{
Expand All @@ -128,7 +137,7 @@ func TestSizeThrottle(t *testing.T) {
workTime := config.BucketInterval_ * time.Duration(iterations)

tconf := testConfig{t, config, eventsTotal, workTime}
tconf.runPipeline()
tconf.runPipeline(t)
}

func TestMixedThrottle(t *testing.T) {
Expand All @@ -142,7 +151,7 @@ func TestMixedThrottle(t *testing.T) {

totalBuckets := iterations + 1
defaultLimitDelta := totalBuckets * defaultLimit
eventsTotal := totalBuckets*(limitA+limitB/avgMessageSize) + defaultLimitDelta
eventsTotal := totalBuckets*(limitA+(limitB/avgMessageSize)) + defaultLimitDelta

config := &Config{
Rules: []RuleConfig{
Expand All @@ -163,5 +172,5 @@ func TestMixedThrottle(t *testing.T) {
workTime := config.BucketInterval_ * time.Duration(iterations)

tconf := testConfig{t, config, eventsTotal, workTime}
tconf.runPipeline()
tconf.runPipeline(t)
}
9 changes: 8 additions & 1 deletion plugin/input/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ But update events don't work with symlinks, so watcher also periodically manuall
> It isn't a `file.d` issue. The data may have been written just before the file truncation. In this case, you may miss to read some events.
> If you care about the delivery, you should also know that the `logrotate` manual clearly states that copy/truncate may cause data loss even on a rotating stage.
> So use copy/truncate or similar actions only if your data isn't critical.
> In order to reduce potential harm of truncation, you can turn on notifications of file modifications.
> By default the plugin is notified only on file creations. Note that following for modifications is more CPU intensive.
**Reading docker container log files:**
Expand Down Expand Up @@ -146,7 +148,7 @@ type Config struct {
//> @3@4@5@6
//>
//> It defines how often to report statistical information to stdout
ReportInterval cfg.Duration `json:"report_interval" default:"10s" parse:"duration"` //*
ReportInterval cfg.Duration `json:"report_interval" default:"5s" parse:"duration"` //*
ReportInterval_ time.Duration

//> @3@4@5@6
Expand All @@ -155,6 +157,11 @@ type Config struct {
//> @maintenance
MaintenanceInterval cfg.Duration `json:"maintenance_interval" default:"10s" parse:"duration"` //*
MaintenanceInterval_ time.Duration

//> @3@4@5@6
//>
//> It turns on watching for file modifications. Turning it on cause more CPU work, but it is more probable to catch file truncation
ShouldWatchModifications bool `json:"should_watch_file_modifications" default:"false"` //*
}

func init() {
Expand Down
33 changes: 13 additions & 20 deletions plugin/input/file/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,11 @@ func pluginConfig(opts ...string) *Config {
}

config := &Config{
WatchingDir: filesDir,
OffsetsFile: filepath.Join(offsetsDir, offsetsFile),
PersistenceMode: "async",
OffsetsOp: op,
WatchingDir: filesDir,
OffsetsFile: filepath.Join(offsetsDir, offsetsFile),
PersistenceMode: "async",
OffsetsOp: op,
MaintenanceInterval: "100ms",
}

_ = cfg.Parse(config, map[string]int{"gomaxprocs": runtime.GOMAXPROCS(0)})
Expand Down Expand Up @@ -368,6 +369,7 @@ func TestWatch(t *testing.T) {
run(&test.Case{
Prepare: func() {
file = createTempFile()
addString(file, content, true, true)
},
Act: func(p *pipeline.Pipeline) {
for x := 0; x < iterations; x++ {
Expand All @@ -376,18 +378,16 @@ func TestWatch(t *testing.T) {
dir = filepath.Join(filepath.Dir(file), dir)
_ = os.Mkdir(dir, 0o770)

err := ioutil.WriteFile(filepath.Join(dir, "new_file"), []byte(content), perm)
err := os.WriteFile(filepath.Join(dir, "new_file"), []byte(content), perm)
if err != nil {
panic(err.Error())
}
err = ioutil.WriteFile(filepath.Join(dir, "other_file"), []byte(content), perm)
err = os.WriteFile(filepath.Join(dir, "other_file"), []byte(content), perm)
if err != nil {
panic(err.Error())
}
}(dir)
}

addString(file, content, true, true)
},
Assert: func(p *pipeline.Pipeline) {
assert.Equal(t, eventCount, p.GetEventsTotal(), "wrong event count")
Expand Down Expand Up @@ -468,7 +468,7 @@ func TestReadContinue(t *testing.T) {
}

for i := range inputEvents {
assert.Equal(t, inputEvents[i], outputEvents[i], "wrong event")
require.Equal(t, inputEvents[i], outputEvents[i], "wrong event")
}

assertOffsetsAreEqual(t, genOffsetsContent(file, size), getContent(getConfigByPipeline(p).OffsetsFile))
Expand Down Expand Up @@ -940,10 +940,6 @@ func TestRotationRenameWhileNotWorking(t *testing.T) {
}

func TestTruncation(t *testing.T) {
if testing.Short() {
t.Skip("skipping testing in short mode")
}

file := ""
x := atomic.NewInt32(2)
run(&test.Case{
Expand All @@ -952,29 +948,26 @@ func TestTruncation(t *testing.T) {
file = createTempFile()
addString(file, `"line_1"`, true, false)
addString(file, `"line_2"`, true, false)
addString(file, `"line_3"`, true, true)

test.WaitForEvents(x)

truncateFile(file)
addString(file, `"line_3"`, true, true)

addString(file, `"line_4"`, true, true)
addString(file, `"line_5"`, true, true)
},
Assert: func(p *pipeline.Pipeline) {
assert.Equal(t, 5, p.GetEventsTotal(), "wrong events count")
assertOffsetsAreEqual(t, genOffsetsContent(file, (len(`"line_1"`)+newLine)*3), getContent(getConfigByPipeline(p).OffsetsFile))
assertOffsetsAreEqual(t, genOffsetsContent(file, (len(`"line_1"`)+newLine)*2), getContent(getConfigByPipeline(p).OffsetsFile))
},
Out: func(event *pipeline.Event) {
logger.Errorf("event=%v", event)
x.Dec()
},
}, 5)
}

func TestTruncationSeq(t *testing.T) {
if testing.Short() {
t.Skip("skipping testing in short mode")
}

p, _, _ := test.NewPipelineMock(nil, "passive")
p.SetInput(getInputInfo())
p.Start()
Expand Down
Loading

0 comments on commit bd5f168

Please sign in to comment.