Skip to content

Commit

Permalink
start to fix output fine plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
Gleb Zakharov committed Nov 1, 2021
1 parent bd5f168 commit 1aa2997
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 4 deletions.
2 changes: 1 addition & 1 deletion plugin/input/k8s/k8s_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func TestAllowedLabels(t *testing.T) {
assert.Nil(t, outEvents[1].Root.Dig("k8s_label_denied_label"), "extra label in event")
}

func TestJoin(t *testing.T) {
func TestJoinK8S(t *testing.T) {
p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(MultilineActionFactory, config(), pipeline.MatchModeAnd, nil, false))
wg := &sync.WaitGroup{}
wg.Add(4)
Expand Down
4 changes: 4 additions & 0 deletions plugin/output/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/ozonru/file.d/cfg"
"github.com/ozonru/file.d/fd"
"github.com/ozonru/file.d/logger"
"github.com/ozonru/file.d/longpanic"
"github.com/ozonru/file.d/pipeline"

Expand Down Expand Up @@ -120,6 +121,7 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
p.idx = p.getStartIdx()
p.createNew()
p.setNextSealUpTime()
logger.Errorf("time setted")

if p.file == nil {
p.logger.Panic("file struct is nil!")
Expand Down Expand Up @@ -198,6 +200,7 @@ func (p *Plugin) write(data []byte) {

func (p *Plugin) createNew() {
p.tsFileName = fmt.Sprintf("%d%s%s%s", time.Now().Unix(), fileNameSeparator, p.fileName, p.fileExtension)
logger.Errorf("tsFileName in createNew=%s", p.tsFileName)
f := fmt.Sprintf("%s%s", p.targetDir, p.tsFileName)
pattern := fmt.Sprintf("%s*%s%s%s", p.targetDir, fileNameSeparator, p.fileName, p.fileExtension)
matches, err := filepath.Glob(pattern)
Expand Down Expand Up @@ -234,6 +237,7 @@ func (p *Plugin) sealUp() {
p.logger.Panicf("could not close file: %s, error: %s", oldFile.Name(), err.Error())
}

logger.Errorf("sealing in %d, newFile: %s", time.Now().Unix(), newFileName)
if p.SealUpCallback != nil {
longpanic.Go(func() { p.SealUpCallback(newFileName) })
}
Expand Down
7 changes: 5 additions & 2 deletions plugin/output/file/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/ozonru/file.d/cfg"
"github.com/ozonru/file.d/logger"
"github.com/ozonru/file.d/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -226,19 +227,21 @@ func TestStart(t *testing.T) {
assert.NotNil(t, p, "could not create new pipeline")

p.Start()
time.Sleep(300 * time.Microsecond)

// check log file created and empty
matches := test.GetMatches(t, logFilePattern)
assert.Equal(t, 1, len(matches))
require.Equal(t, 1, len(matches))

tsFileName := matches[0]
test.CheckZero(t, tsFileName, "log file is not created or is not empty")
logger.Errorf("tsFileName=%s", tsFileName)

// send events
logger.Errorf("send pack, t=%s", time.Now().Unix())
packSize := test.SendPack(t, p, tests.firstPack)
totalSent += packSize
time.Sleep(100 * time.Millisecond)
logger.Errorf("after sleep")

// check that plugin wrote into the file
require.Equal(t, packSize, test.CheckNotZero(t, tsFileName, "check log file has data"), "plugin did not write into the file")
Expand Down
2 changes: 1 addition & 1 deletion test/file_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func SendPack(t *testing.T, p *pipeline.Pipeline, msgs []Msg) int64 {
func ClearDir(t *testing.T, dir string) {
t.Helper()
if err := os.RemoveAll(dir); err != nil {
t.Fatalf("coudl not delete dirs and files adter tests, error: %s", err.Error())
t.Fatalf("could not delete dirs and files after tests, error: %s", err.Error())
}
}

Expand Down

0 comments on commit 1aa2997

Please sign in to comment.