Skip to content

Commit

Permalink
Moved code around. Renamed internal properties to be slightly cleaner
Browse files Browse the repository at this point in the history
  • Loading branch information
leanderlee committed Mar 4, 2016
1 parent 3e8636f commit bbf50d0
Show file tree
Hide file tree
Showing 12 changed files with 711 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[*.{js,json}]
indent_style = space
indent_size = 2
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@ build/Release
# Dependency directory
# https://www.npmjs.org/doc/misc/npm-faq.html#should-i-check-my-node_modules-folder-into-git
node_modules
.DS_Store
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
# better-queue
Better Queue for NodeJS


Queues can be quite hard. There's a lot of cases to consider. Luckily, better-queue handles all of these cases!
198 changes: 198 additions & 0 deletions lib/queue.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@

var uuid = require('node-uuid');
var Ticket = require('./ticket');
var Tickets = require('./tickets');

function Queue(process, opts) {
var self = this;
opts = opts || {};
if (typeof process === 'object') {
opts = process || {};
}
if (typeof process === 'function') {
opts.process = process;
}
if (!opts.process) {
throw new Error("Queue has no process function.");
}

opts = opts || {};

self.accept = opts.accept || function (input, cb) { cb(null, input) };
self.process = opts.process || function (task, cb) { cb(null, {}) };
self.merge = opts.merge || function (task1, task2, cb) { cb(null, task2) };
self.running = opts.running || function (worker, task, cb) { cb(null, task) };

self.fifo = opts.fifo || false;
self.batchSize = opts.batchSize || 1;
self.concurrent = opts.concurrent || 1;
self.idleTimeout = opts.idleTimeout || 0;
self.taskTimeout = opts.taskTimeout || Infinity;
self.maxRetries = opts.maxRetries || Infinity;

self._processes = 0;
self._queue = [];
self._tasks = {}; // Map of taskId => task
self._retries = {}; // Map of taskId => retries
self._workers = {}; // Map of taskId => active job
self._tickets = {}; // Map of taskId => tickets

// TODO: Hold queue (wait for some event; like reconnect)
// TODO: Load from persistent storage
}

Queue.prototype.push = function (input, cb) {
var self = this;
var ticket = new Ticket();
if (cb) {
ticket
.on('done', function (result) { cb(null, result) })
.on('error', function (err) { cb(err) })
}
self.accept(input, function (err, task) {
if (err || task === undefined || task === false || task === null) {
return ticket.failed('input_rejected');
}
var taskId = task.id || uuid.v4();
ticket.accept();
self._updateQueue(taskId, task, function (err) {
if (err) return ticket.failed('failed_to_queue');
if (!self._tickets[taskId]) {
self._tickets[taskId] = new Tickets();
}
self._tickets[taskId].push(ticket);
ticket.queued();
setImmediate(function () {
self._processNext();
})
});
})
return ticket;
}

Queue.prototype._updateQueue = function (taskId, task, cb) {
var self = this;
if (self._workers[taskId]) {
self.running(self._workers[taskId], task, function (err, task) {
if (err) return cb(err);
if (task === undefined || task === false || task === null) return cb();
self._enqueueTask(taskId, task, cb);
})
} else {
self._enqueueTask(taskId, task, cb);
}
}

Queue.prototype._enqueueTask = function (taskId, task, cb) {
var self = this;
// Check if already in queue
if (self._tasks[taskId] !== undefined) {
self.merge(self._tasks[taskId], task, function (err, newTask) {
if (err) return cb({ message: 'failed_task_merge' });
if (newTask !== undefined) {
self._tasks[taskId] = newTask;
}
cb();
});
} else {
self._tasks[taskId] = task;
self._queue.push(taskId);
cb();
}
}

Queue.prototype._processNext = function () {
var self = this;
if (!self._queue.length) return;
if (self._processes >= self.concurrent) return;
self._processes++;
var batch = [];
while (self._queue.length && batch.length < self.batchSize) {
var taskId = self._queue[self.fifo ? 'pop' : 'shift']();
var task = self._tasks[taskId];
batch.push({
id: taskId,
task: self._tasks[taskId],
ticket: self._tickets[taskId]
});
delete self._tasks[taskId];
delete self._tickets[taskId];
}
if (!batch.length) return;
setImmediate(function () {
self._startTask(batch);
})
if (self._queue.length) {
setImmediate(function () {
self._processNext();
})
}
}

Queue.prototype._startTask = function (batch) {
var self = this;
var task = null;
var tickets = {};
var failedTask = function (taskId, msg) {
if (tickets[taskId]) {
tickets[taskId].failed(msg);
delete tickets[taskId];
}
setTimeout(function () {
self._processNext();
}, self.idleTimeout);
}
var finishedTask = function (taskId, result) {
if (tickets[taskId]) {
tickets[taskId].finish(result);
delete tickets[taskId];
}
setTimeout(function () {
self._processNext();
}, self.idleTimeout);
}
batch.forEach(function (item) {
if (item.ticket) {
tickets[item.id] = item.ticket;
tickets[item.id].started(item.task.total);
}
});
if (self.batchSize === 1) {
task = batch[0].task;
} else {
var tasks = {};
batch.forEach(function (item) {
tasks[item.id] = item.task;
});
task = {
tasks: tasks,
finish: finishedTask,
failed: failedTask,
progress: function (taskId, current) {
if (tickets[taskId]) {
tickets[taskId].progress(current);
}
},
}
}
var worker = self.process(task, function (err, result) {
self._processes--;
if (err) {
// TODO: Retry?
for (var taskId in tickets) {
failedTask(taskId, err.message || err);
delete self._workers[taskId];
}
} else {
for (var taskId in tickets) {
finishedTask(taskId, result);
delete self._workers[taskId];
}
}
});
batch.forEach(function (item) {
self._workers[item.id] = worker || {};
});
}

module.exports = Queue;
74 changes: 74 additions & 0 deletions lib/ticket.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@

var util = require('util');
var EE = require('events').EventEmitter;
var ETA = require('node-eta');

function Ticket(opts) {
this.isAccepted = false;
this.isQueued = false;
this.isStarted = false;
this.isFailed = false;
this.isFinished = false;
this.result = null;
this.status = 'created';
this.eta = new ETA();
}

util.inherits(Ticket, EE);

Ticket.prototype.accept = function () {
this.status = 'accepted';
this.isAccepted = true;
}

Ticket.prototype.queued = function () {
this.status = 'queued';
this.isQueued = true;
}

Ticket.prototype.unqueued = function () {
this.status = 'accepted';
this.isQueued = false;
}

Ticket.prototype.started = function (total) {
this.eta.count = total || 1;
this.eta.start();
this.isStarted = true;
this.status = 'in-progress';
}

Ticket.prototype.failed = function (msg) {
this.isFailed = true;
this.isFinished = true;
this.status = 'failed';
this.emit('error', new Error(msg));
}

Ticket.prototype.finish = function (result) {
this.eta.done = this.eta.count;
this.isFinished = true;
this.status = 'finished';
this.result = result;
this.emit('done', this.result);
}

Ticket.prototype.stopped = function () {
this.eta = new ETA();
this.isFinished = false;
this.isStarted = false;
this.status = 'queued';
this.result = null;
}

Ticket.prototype.progress = function (current) {
this.eta.done = current;
this.emit('progress', {
current: this.eta.done,
total: this.eta.count,
pct: (this.eta.done/this.eta.count)*100,
eta: this.eta.format('{{etah}}')
});
}

module.exports = Ticket;
24 changes: 24 additions & 0 deletions lib/tickets.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@

var Ticket = require('./ticket');

function Tickets() {
this.tickets = [];
}

Tickets.prototype._apply = function (fn, args) {
this.tickets.forEach(function (ticket) {
ticket[fn].apply(ticket, args);
})
}

Tickets.prototype.push = function (ticket) {
this.tickets.push(ticket);
}

Object.keys(Ticket.prototype).forEach(function (method) {
Tickets.prototype[method] = function () {
this._apply(method, arguments);
}
})

module.exports = Tickets;
37 changes: 37 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
{
"name": "better-queue",
"version": "1.0.0",
"description": "Better Queue for NodeJS",
"main": "lib/queue.js",
"directories": {
"test": "test"
},
"scripts": {
"test": "mocha"
},
"repository": {
"type": "git",
"url": "https://github.com/leanderlee/better-queue.git"
},
"keywords": [
"persistent",
"network",
"async",
"queue",
"best",
"better"
],
"author": "Leander Lee <me@leander.ca>",
"license": "MIT",
"bugs": {
"url": "https://github.com/leanderlee/better-queue/issues"
},
"homepage": "https://github.com/leanderlee/better-queue",
"devDependencies": {
"mocha": "^2.3.4"
},
"dependencies": {
"node-eta": "^0.9.0",
"node-uuid": "^1.4.7"
}
}
17 changes: 17 additions & 0 deletions test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
var Queue = require('./Queue')
var q = new Queue(function (task, cb) {
console.log('working' + task);
setTimeout(function () {
console.log('finished' + task);
cb(null, { message: 'done' + task });
}, task*1000);
}, { concurrent: 2 })
console.log('queued 1')
q.push(1);
console.log('queued 2')
q.push(2);
console.log('queued 3')
q.push(3);
console.log('queued 4')
q.push(4);

Loading

0 comments on commit bbf50d0

Please sign in to comment.