-
Notifications
You must be signed in to change notification settings - Fork 43
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Moved code around. Renamed internal properties to be slightly cleaner
- Loading branch information
1 parent
3e8636f
commit bbf50d0
Showing
12 changed files
with
711 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
[*.{js,json}] | ||
indent_style = space | ||
indent_size = 2 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,2 +1,5 @@ | ||
# better-queue | ||
Better Queue for NodeJS | ||
|
||
|
||
Queues can be quite hard. There's a lot of cases to consider. Luckily, better-queue handles all of these cases! |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,198 @@ | ||
|
||
var uuid = require('node-uuid'); | ||
var Ticket = require('./ticket'); | ||
var Tickets = require('./tickets'); | ||
|
||
function Queue(process, opts) { | ||
var self = this; | ||
opts = opts || {}; | ||
if (typeof process === 'object') { | ||
opts = process || {}; | ||
} | ||
if (typeof process === 'function') { | ||
opts.process = process; | ||
} | ||
if (!opts.process) { | ||
throw new Error("Queue has no process function."); | ||
} | ||
|
||
opts = opts || {}; | ||
|
||
self.accept = opts.accept || function (input, cb) { cb(null, input) }; | ||
self.process = opts.process || function (task, cb) { cb(null, {}) }; | ||
self.merge = opts.merge || function (task1, task2, cb) { cb(null, task2) }; | ||
self.running = opts.running || function (worker, task, cb) { cb(null, task) }; | ||
|
||
self.fifo = opts.fifo || false; | ||
self.batchSize = opts.batchSize || 1; | ||
self.concurrent = opts.concurrent || 1; | ||
self.idleTimeout = opts.idleTimeout || 0; | ||
self.taskTimeout = opts.taskTimeout || Infinity; | ||
self.maxRetries = opts.maxRetries || Infinity; | ||
|
||
self._processes = 0; | ||
self._queue = []; | ||
self._tasks = {}; // Map of taskId => task | ||
self._retries = {}; // Map of taskId => retries | ||
self._workers = {}; // Map of taskId => active job | ||
self._tickets = {}; // Map of taskId => tickets | ||
|
||
// TODO: Hold queue (wait for some event; like reconnect) | ||
// TODO: Load from persistent storage | ||
} | ||
|
||
Queue.prototype.push = function (input, cb) { | ||
var self = this; | ||
var ticket = new Ticket(); | ||
if (cb) { | ||
ticket | ||
.on('done', function (result) { cb(null, result) }) | ||
.on('error', function (err) { cb(err) }) | ||
} | ||
self.accept(input, function (err, task) { | ||
if (err || task === undefined || task === false || task === null) { | ||
return ticket.failed('input_rejected'); | ||
} | ||
var taskId = task.id || uuid.v4(); | ||
ticket.accept(); | ||
self._updateQueue(taskId, task, function (err) { | ||
if (err) return ticket.failed('failed_to_queue'); | ||
if (!self._tickets[taskId]) { | ||
self._tickets[taskId] = new Tickets(); | ||
} | ||
self._tickets[taskId].push(ticket); | ||
ticket.queued(); | ||
setImmediate(function () { | ||
self._processNext(); | ||
}) | ||
}); | ||
}) | ||
return ticket; | ||
} | ||
|
||
Queue.prototype._updateQueue = function (taskId, task, cb) { | ||
var self = this; | ||
if (self._workers[taskId]) { | ||
self.running(self._workers[taskId], task, function (err, task) { | ||
if (err) return cb(err); | ||
if (task === undefined || task === false || task === null) return cb(); | ||
self._enqueueTask(taskId, task, cb); | ||
}) | ||
} else { | ||
self._enqueueTask(taskId, task, cb); | ||
} | ||
} | ||
|
||
Queue.prototype._enqueueTask = function (taskId, task, cb) { | ||
var self = this; | ||
// Check if already in queue | ||
if (self._tasks[taskId] !== undefined) { | ||
self.merge(self._tasks[taskId], task, function (err, newTask) { | ||
if (err) return cb({ message: 'failed_task_merge' }); | ||
if (newTask !== undefined) { | ||
self._tasks[taskId] = newTask; | ||
} | ||
cb(); | ||
}); | ||
} else { | ||
self._tasks[taskId] = task; | ||
self._queue.push(taskId); | ||
cb(); | ||
} | ||
} | ||
|
||
Queue.prototype._processNext = function () { | ||
var self = this; | ||
if (!self._queue.length) return; | ||
if (self._processes >= self.concurrent) return; | ||
self._processes++; | ||
var batch = []; | ||
while (self._queue.length && batch.length < self.batchSize) { | ||
var taskId = self._queue[self.fifo ? 'pop' : 'shift'](); | ||
var task = self._tasks[taskId]; | ||
batch.push({ | ||
id: taskId, | ||
task: self._tasks[taskId], | ||
ticket: self._tickets[taskId] | ||
}); | ||
delete self._tasks[taskId]; | ||
delete self._tickets[taskId]; | ||
} | ||
if (!batch.length) return; | ||
setImmediate(function () { | ||
self._startTask(batch); | ||
}) | ||
if (self._queue.length) { | ||
setImmediate(function () { | ||
self._processNext(); | ||
}) | ||
} | ||
} | ||
|
||
Queue.prototype._startTask = function (batch) { | ||
var self = this; | ||
var task = null; | ||
var tickets = {}; | ||
var failedTask = function (taskId, msg) { | ||
if (tickets[taskId]) { | ||
tickets[taskId].failed(msg); | ||
delete tickets[taskId]; | ||
} | ||
setTimeout(function () { | ||
self._processNext(); | ||
}, self.idleTimeout); | ||
} | ||
var finishedTask = function (taskId, result) { | ||
if (tickets[taskId]) { | ||
tickets[taskId].finish(result); | ||
delete tickets[taskId]; | ||
} | ||
setTimeout(function () { | ||
self._processNext(); | ||
}, self.idleTimeout); | ||
} | ||
batch.forEach(function (item) { | ||
if (item.ticket) { | ||
tickets[item.id] = item.ticket; | ||
tickets[item.id].started(item.task.total); | ||
} | ||
}); | ||
if (self.batchSize === 1) { | ||
task = batch[0].task; | ||
} else { | ||
var tasks = {}; | ||
batch.forEach(function (item) { | ||
tasks[item.id] = item.task; | ||
}); | ||
task = { | ||
tasks: tasks, | ||
finish: finishedTask, | ||
failed: failedTask, | ||
progress: function (taskId, current) { | ||
if (tickets[taskId]) { | ||
tickets[taskId].progress(current); | ||
} | ||
}, | ||
} | ||
} | ||
var worker = self.process(task, function (err, result) { | ||
self._processes--; | ||
if (err) { | ||
// TODO: Retry? | ||
for (var taskId in tickets) { | ||
failedTask(taskId, err.message || err); | ||
delete self._workers[taskId]; | ||
} | ||
} else { | ||
for (var taskId in tickets) { | ||
finishedTask(taskId, result); | ||
delete self._workers[taskId]; | ||
} | ||
} | ||
}); | ||
batch.forEach(function (item) { | ||
self._workers[item.id] = worker || {}; | ||
}); | ||
} | ||
|
||
module.exports = Queue; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
|
||
var util = require('util'); | ||
var EE = require('events').EventEmitter; | ||
var ETA = require('node-eta'); | ||
|
||
function Ticket(opts) { | ||
this.isAccepted = false; | ||
this.isQueued = false; | ||
this.isStarted = false; | ||
this.isFailed = false; | ||
this.isFinished = false; | ||
this.result = null; | ||
this.status = 'created'; | ||
this.eta = new ETA(); | ||
} | ||
|
||
util.inherits(Ticket, EE); | ||
|
||
Ticket.prototype.accept = function () { | ||
this.status = 'accepted'; | ||
this.isAccepted = true; | ||
} | ||
|
||
Ticket.prototype.queued = function () { | ||
this.status = 'queued'; | ||
this.isQueued = true; | ||
} | ||
|
||
Ticket.prototype.unqueued = function () { | ||
this.status = 'accepted'; | ||
this.isQueued = false; | ||
} | ||
|
||
Ticket.prototype.started = function (total) { | ||
this.eta.count = total || 1; | ||
this.eta.start(); | ||
this.isStarted = true; | ||
this.status = 'in-progress'; | ||
} | ||
|
||
Ticket.prototype.failed = function (msg) { | ||
this.isFailed = true; | ||
this.isFinished = true; | ||
this.status = 'failed'; | ||
this.emit('error', new Error(msg)); | ||
} | ||
|
||
Ticket.prototype.finish = function (result) { | ||
this.eta.done = this.eta.count; | ||
this.isFinished = true; | ||
this.status = 'finished'; | ||
this.result = result; | ||
this.emit('done', this.result); | ||
} | ||
|
||
Ticket.prototype.stopped = function () { | ||
this.eta = new ETA(); | ||
this.isFinished = false; | ||
this.isStarted = false; | ||
this.status = 'queued'; | ||
this.result = null; | ||
} | ||
|
||
Ticket.prototype.progress = function (current) { | ||
this.eta.done = current; | ||
this.emit('progress', { | ||
current: this.eta.done, | ||
total: this.eta.count, | ||
pct: (this.eta.done/this.eta.count)*100, | ||
eta: this.eta.format('{{etah}}') | ||
}); | ||
} | ||
|
||
module.exports = Ticket; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
|
||
var Ticket = require('./ticket'); | ||
|
||
function Tickets() { | ||
this.tickets = []; | ||
} | ||
|
||
Tickets.prototype._apply = function (fn, args) { | ||
this.tickets.forEach(function (ticket) { | ||
ticket[fn].apply(ticket, args); | ||
}) | ||
} | ||
|
||
Tickets.prototype.push = function (ticket) { | ||
this.tickets.push(ticket); | ||
} | ||
|
||
Object.keys(Ticket.prototype).forEach(function (method) { | ||
Tickets.prototype[method] = function () { | ||
this._apply(method, arguments); | ||
} | ||
}) | ||
|
||
module.exports = Tickets; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
{ | ||
"name": "better-queue", | ||
"version": "1.0.0", | ||
"description": "Better Queue for NodeJS", | ||
"main": "lib/queue.js", | ||
"directories": { | ||
"test": "test" | ||
}, | ||
"scripts": { | ||
"test": "mocha" | ||
}, | ||
"repository": { | ||
"type": "git", | ||
"url": "https://github.com/leanderlee/better-queue.git" | ||
}, | ||
"keywords": [ | ||
"persistent", | ||
"network", | ||
"async", | ||
"queue", | ||
"best", | ||
"better" | ||
], | ||
"author": "Leander Lee <me@leander.ca>", | ||
"license": "MIT", | ||
"bugs": { | ||
"url": "https://github.com/leanderlee/better-queue/issues" | ||
}, | ||
"homepage": "https://github.com/leanderlee/better-queue", | ||
"devDependencies": { | ||
"mocha": "^2.3.4" | ||
}, | ||
"dependencies": { | ||
"node-eta": "^0.9.0", | ||
"node-uuid": "^1.4.7" | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
var Queue = require('./Queue') | ||
var q = new Queue(function (task, cb) { | ||
console.log('working' + task); | ||
setTimeout(function () { | ||
console.log('finished' + task); | ||
cb(null, { message: 'done' + task }); | ||
}, task*1000); | ||
}, { concurrent: 2 }) | ||
console.log('queued 1') | ||
q.push(1); | ||
console.log('queued 2') | ||
q.push(2); | ||
console.log('queued 3') | ||
q.push(3); | ||
console.log('queued 4') | ||
q.push(4); | ||
|
Oops, something went wrong.