Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

file input plugin: define watching_dir from include patterns #567

Merged
merged 11 commits into from
Oct 9, 2024
Prev Previous commit
Next Next commit
check dir files in notify by base paths from include patterns
  • Loading branch information
DmitryRomanov committed Oct 7, 2024
commit 7b2603ce1d4949531913e126f7c6ad84b27f8ebb
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ require (
github.com/alicebob/miniredis/v2 v2.30.5
github.com/bitly/go-simplejson v0.5.1
github.com/bufbuild/protocompile v0.13.0
github.com/bmatcuk/doublestar/v4 v4.0.2
github.com/cenkalti/backoff/v4 v4.2.1
github.com/cespare/xxhash/v2 v2.2.0
github.com/bmatcuk/doublestar/v4 v4.0.2
github.com/euank/go-kmsg-parser v2.0.0+incompatible
github.com/go-faster/jx v1.1.0
github.com/go-redis/redis v6.15.9+incompatible
Expand Down
6 changes: 3 additions & 3 deletions plugin/input/file/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,9 @@ func TestProvierWatcherPaths(t *testing.T) {
config := tt.config
test.NewConfig(config, map[string]int{"gomaxprocs": runtime.GOMAXPROCS(0)})
metrics := newMetricCollection(
ctl.RegisterCounter("worker", "help_test"),
ctl.RegisterCounter("worker", "help_test"),
ctl.RegisterGauge("worker", "help_test"),
ctl.RegisterCounter("worker1", "help_test"),
ctl.RegisterCounter("worker2", "help_test"),
ctl.RegisterGauge("worker3", "help_test"),
)
jp := NewJobProvider(config, metrics, &zap.SugaredLogger{})

Expand Down
16 changes: 16 additions & 0 deletions plugin/input/file/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,12 @@ func (w *watcher) notify(e notify.Event, path string) {

if stat.IsDir() {
dirFilename := filename
check_dir:
for {
for _, path := range w.basePaths {
if path == dirFilename {
w.tryAddPath(filename)
break check_dir
}
}
if dirFilename == w.commonPath {
Expand All @@ -166,6 +168,20 @@ func (w *watcher) notify(e notify.Event, path string) {
return
}

dirFilename := filepath.Dir(filename)
check_file:
for {
for _, path := range w.basePaths {
if path == dirFilename {
break check_file
}
}
if dirFilename == w.commonPath {
return
}
dirFilename = filepath.Dir(dirFilename)
}

w.logger.Infof("%s %s", e, path)

for _, pattern := range w.paths.Include {
Expand Down
1 change: 1 addition & 0 deletions plugin/input/file/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ func TestWatcher(t *testing.T) {
}
}

// nolint:gocritic
func TestWatcherPaths(t *testing.T) {
dir := t.TempDir()
shouldCreate := atomic.Int64{}
Expand Down