-
Notifications
You must be signed in to change notification settings - Fork 139
/
workerpool.go
301 lines (274 loc) · 8.23 KB
/
workerpool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
package workerpool
import (
"context"
"sync"
"sync/atomic"
"time"
"github.com/gammazero/deque"
)
const (
// If workes idle for at least this period of time, then stop a worker.
idleTimeout = 2 * time.Second
)
// New creates and starts a pool of worker goroutines.
//
// The maxWorkers parameter specifies the maximum number of workers that can
// execute tasks concurrently. When there are no incoming tasks, workers are
// gradually stopped until there are no remaining workers.
func New(maxWorkers int) *WorkerPool {
// There must be at least one worker.
if maxWorkers < 1 {
maxWorkers = 1
}
pool := &WorkerPool{
maxWorkers: maxWorkers,
taskQueue: make(chan func()),
workerQueue: make(chan func()),
stopSignal: make(chan struct{}),
stoppedChan: make(chan struct{}),
}
// Start the task dispatcher.
go pool.dispatch()
return pool
}
// WorkerPool is a collection of goroutines, where the number of concurrent
// goroutines processing requests does not exceed the specified maximum.
type WorkerPool struct {
maxWorkers int
taskQueue chan func()
workerQueue chan func()
stoppedChan chan struct{}
stopSignal chan struct{}
waitingQueue deque.Deque[func()]
stopLock sync.Mutex
stopOnce sync.Once
stopped bool
waiting int32
wait bool
}
// Size returns the maximum number of concurrent workers.
func (p *WorkerPool) Size() int {
return p.maxWorkers
}
// Stop stops the worker pool and waits for only currently running tasks to
// complete. Pending tasks that are not currently running are abandoned. Tasks
// must not be submitted to the worker pool after calling stop.
//
// Since creating the worker pool starts at least one goroutine, for the
// dispatcher, Stop() or StopWait() should be called when the worker pool is no
// longer needed.
func (p *WorkerPool) Stop() {
p.stop(false)
}
// StopWait stops the worker pool and waits for all queued tasks tasks to
// complete. No additional tasks may be submitted, but all pending tasks are
// executed by workers before this function returns.
func (p *WorkerPool) StopWait() {
p.stop(true)
}
// Stopped returns true if this worker pool has been stopped.
func (p *WorkerPool) Stopped() bool {
p.stopLock.Lock()
defer p.stopLock.Unlock()
return p.stopped
}
// Submit enqueues a function for a worker to execute.
//
// Any external values needed by the task function must be captured in a
// closure. Any return values should be returned over a channel that is
// captured in the task function closure.
//
// Submit will not block regardless of the number of tasks submitted. Each task
// is immediately given to an available worker or to a newly started worker. If
// there are no available workers, and the maximum number of workers are
// already created, then the task is put onto a waiting queue.
//
// When there are tasks on the waiting queue, any additional new tasks are put
// on the waiting queue. Tasks are removed from the waiting queue as workers
// become available.
//
// As long as no new tasks arrive, one available worker is shutdown each time
// period until there are no more idle workers. Since the time to start new
// goroutines is not significant, there is no need to retain idle workers
// indefinitely.
func (p *WorkerPool) Submit(task func()) {
if task != nil {
p.taskQueue <- task
}
}
// SubmitWait enqueues the given function and waits for it to be executed.
func (p *WorkerPool) SubmitWait(task func()) {
if task == nil {
return
}
doneChan := make(chan struct{})
p.taskQueue <- func() {
task()
close(doneChan)
}
<-doneChan
}
// WaitingQueueSize returns the count of tasks in the waiting queue.
func (p *WorkerPool) WaitingQueueSize() int {
return int(atomic.LoadInt32(&p.waiting))
}
// Pause causes all workers to wait on the given Context, thereby making them
// unavailable to run tasks. Pause returns when all workers are waiting. Tasks
// can continue to be queued to the workerpool, but are not executed until the
// Context is canceled or times out.
//
// Calling Pause when the worker pool is already paused causes Pause to wait
// until all previous pauses are canceled. This allows a goroutine to take
// control of pausing and unpausing the pool as soon as other goroutines have
// unpaused it.
//
// When the workerpool is stopped, workers are unpaused and queued tasks are
// executed during StopWait.
func (p *WorkerPool) Pause(ctx context.Context) {
p.stopLock.Lock()
defer p.stopLock.Unlock()
if p.stopped {
return
}
ready := new(sync.WaitGroup)
ready.Add(p.maxWorkers)
for i := 0; i < p.maxWorkers; i++ {
p.Submit(func() {
ready.Done()
select {
case <-ctx.Done():
case <-p.stopSignal:
}
})
}
// Wait for workers to all be paused
ready.Wait()
}
// dispatch sends the next queued task to an available worker.
func (p *WorkerPool) dispatch() {
defer close(p.stoppedChan)
timeout := time.NewTimer(idleTimeout)
var workerCount int
var idle bool
var wg sync.WaitGroup
Loop:
for {
// As long as tasks are in the waiting queue, incoming tasks are put
// into the waiting queue and tasks to run are taken from the waiting
// queue. Once the waiting queue is empty, then go back to submitting
// incoming tasks directly to available workers.
if p.waitingQueue.Len() != 0 {
if !p.processWaitingQueue() {
break Loop
}
continue
}
select {
case task, ok := <-p.taskQueue:
if !ok {
break Loop
}
// Got a task to do.
select {
case p.workerQueue <- task:
default:
// Create a new worker, if not at max.
if workerCount < p.maxWorkers {
wg.Add(1)
go worker(task, p.workerQueue, &wg)
workerCount++
} else {
// Enqueue task to be executed by next available worker.
p.waitingQueue.PushBack(task)
atomic.StoreInt32(&p.waiting, int32(p.waitingQueue.Len()))
}
}
idle = false
case <-timeout.C:
// Timed out waiting for work to arrive. Kill a ready worker if
// pool has been idle for a whole timeout.
if idle && workerCount > 0 {
if p.killIdleWorker() {
workerCount--
}
}
idle = true
timeout.Reset(idleTimeout)
}
}
// If instructed to wait, then run tasks that are already queued.
if p.wait {
p.runQueuedTasks()
}
// Stop all remaining workers as they become ready.
for workerCount > 0 {
p.workerQueue <- nil
workerCount--
}
wg.Wait()
timeout.Stop()
}
// worker executes tasks and stops when it receives a nil task.
func worker(task func(), workerQueue chan func(), wg *sync.WaitGroup) {
for task != nil {
task()
task = <-workerQueue
}
wg.Done()
}
// stop tells the dispatcher to exit, and whether or not to complete queued
// tasks.
func (p *WorkerPool) stop(wait bool) {
p.stopOnce.Do(func() {
// Signal that workerpool is stopping, to unpause any paused workers.
close(p.stopSignal)
// Acquire stopLock to wait for any pause in progress to complete. All
// in-progress pauses will complete because the stopSignal unpauses the
// workers.
p.stopLock.Lock()
// The stopped flag prevents any additional paused workers. This makes
// it safe to close the taskQueue.
p.stopped = true
p.stopLock.Unlock()
p.wait = wait
// Close task queue and wait for currently running tasks to finish.
close(p.taskQueue)
})
<-p.stoppedChan
}
// processWaitingQueue puts new tasks onto the waiting queue, and removes
// tasks from the waiting queue as workers become available. Returns false if
// worker pool is stopped.
func (p *WorkerPool) processWaitingQueue() bool {
select {
case task, ok := <-p.taskQueue:
if !ok {
return false
}
p.waitingQueue.PushBack(task)
case p.workerQueue <- p.waitingQueue.Front():
// A worker was ready, so gave task to worker.
p.waitingQueue.PopFront()
}
atomic.StoreInt32(&p.waiting, int32(p.waitingQueue.Len()))
return true
}
func (p *WorkerPool) killIdleWorker() bool {
select {
case p.workerQueue <- nil:
// Sent kill signal to worker.
return true
default:
// No ready workers. All, if any, workers are busy.
return false
}
}
// runQueuedTasks removes each task from the waiting queue and gives it to
// workers until queue is empty.
func (p *WorkerPool) runQueuedTasks() {
for p.waitingQueue.Len() != 0 {
// A worker is ready, so give task to worker.
p.workerQueue <- p.waitingQueue.PopFront()
atomic.StoreInt32(&p.waiting, int32(p.waitingQueue.Len()))
}
}