-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwatcher.go
169 lines (140 loc) · 3.14 KB
/
watcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
package kiroku
import (
"context"
"fmt"
"os"
"path/filepath"
"sync"
"time"
)
func newWatcher(ctx context.Context, opts Options, onTrigger func(Filename) error, ts ...Type) *watcher {
var w watcher
w.ctx = ctx
w.opts = opts
w.onTrigger = onTrigger
// Initialize semaphores
w.s = make(semaphore, 1)
// Set types
w.ts = ts
// Increment jobs waiter
w.jobs.Add(1)
// Initialize watch job
go w.watch()
// Associate returning pointer to created Producer
return &w
}
type watcher struct {
ctx context.Context
onTrigger func(Filename) error
// Merging semaphore
s semaphore
// Types
ts []Type
opts Options
// Goroutine job waiter
jobs sync.WaitGroup
}
func (w *watcher) watch() {
var (
ok bool
err error
)
// Decrement jobs waitgroup when func is done
defer w.jobs.Done()
// Iterate until Producer is closed
for !isClosed(w.ctx) {
if ok, err = w.process(); err != nil {
err = fmt.Errorf("error processing: %v", err)
w.opts.OnError(err)
w.sleep(w.opts.ErrorDelay)
}
if !ok {
w.waitForNext()
}
}
}
func (w *watcher) processAll() (err error) {
var ok bool
// Iterate until Producer is closed
for {
if ok, err = w.process(); !ok || err != nil {
return
}
}
}
// process will process matches until:
// - No more matches are found
// - Watcher has been closed
func (w *watcher) process() (ok bool, err error) {
var filename Filename
// Get next file for the target prefix
if filename, ok, err = w.getNext(); err != nil {
err = fmt.Errorf("error getting next %+v filename: <%v>, sleeping for %v and trying again", w.ts, err, w.opts.EndOfResultsDelay)
return
}
if !ok {
return
}
// Call provided function
if err = w.onTrigger(filename); err != nil {
err = fmt.Errorf("error encountered during action for <%s>: <%v>, sleeping for %v and trying again", filename, err, w.opts.ErrorDelay)
return
}
return
}
func (w *watcher) getNext() (filename Filename, ok bool, err error) {
cleanDir := filepath.Clean(w.opts.Dir)
fn := func(iteratingName string, info os.FileInfo) (err error) {
if info.IsDir() {
// We are not interested in directories, return
return
}
if filepath.Dir(iteratingName) != cleanDir {
// Current item is not in the same directory, return
return
}
truncated := filepath.Base(iteratingName)
// Check to see if current file is a match for the current name and prefix
if filename, err = ParseFilename(truncated); err != nil {
err = nil
return
}
if filename.Name != w.opts.FullName() {
return
}
for _, t := range w.ts {
if filename.Filetype == t {
ok = true
return errBreak
}
}
return
}
// Iterate through files within directory
if err = walk(w.opts.Dir, fn); err == errBreak {
err = nil
}
return
}
func (w *watcher) waitForNext() {
select {
// Wait for semaphore signal
case <-w.s:
// Wait for context to be finished
case <-w.ctx.Done():
}
}
func (w *watcher) sleep(d time.Duration) {
select {
// Wait for timer duration to complete
case <-time.NewTimer(d).C:
// Wait for context to be finished
case <-w.ctx.Done():
}
}
func (w *watcher) waitToComplete() {
w.jobs.Wait()
}
func (w *watcher) trigger() {
w.s.send()
}