diff --git a/.github/issue_template.md b/.github/issue_template.md
new file mode 100644
index 000000000..8057186f8
--- /dev/null
+++ b/.github/issue_template.md
@@ -0,0 +1,14 @@
+
+
+## Description
+
+## Test code to reproduce
+
+## Bull version
+
+## Additional information
+
diff --git a/CHANGELOG.md b/CHANGELOG.md
index d053d6027..8a90efa3c 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,10 @@
+v.3.3.8
+=======
+
+- Fixed #812. External process doesn't terminate on `queue.close()`.
+- Fixed #830. Named Process Sent to Wrong Processor.
+- Fixed #572. Do not close external connections.
+
v.3.3.7
=======
diff --git a/PATTERNS.md b/PATTERNS.md
index 728ed2b3d..0f0b39f21 100644
--- a/PATTERNS.md
+++ b/PATTERNS.md
@@ -67,8 +67,11 @@ Reusing Redis Connections
A standard queue requires **3 connections** to the Redis server. In some situations you might want to re-use connections—for example on Heroku where the connection count is restricted. You can do this with the `createClient` option in the `Queue` constructor:
```js
-var client = new redis();
-var subscriber = new redis();
+var {REDIS_URL} = process.env
+
+var Redis = require('ioredis')
+var client = new Redis(REDIS_URL);
+var subscriber = new Redis(REDIS_URL);
var opts = {
createClient: function(type){
@@ -78,7 +81,7 @@ var opts = {
case 'subscriber':
return subscriber;
default:
- return new redis();
+ return new Redis();
}
}
}
diff --git a/README.md b/README.md
index 3265584a8..dc40acd06 100644
--- a/README.md
+++ b/README.md
@@ -60,6 +60,9 @@
+
+
+
Are you developing bull sponsored by a company? Please, let us now!
@@ -95,6 +98,7 @@ There are a few third-party UIs that you can use for monitoring:
**Bull v3**
+- [NEW (Preview) Taskforce](https://taskforce.sh)
- [Arena](https://github.com/mixmaxhq/arena)
**Bull <= v2**
diff --git a/REFERENCE.md b/REFERENCE.md
index cb0087722..4bf9fb1d4 100644
--- a/REFERENCE.md
+++ b/REFERENCE.md
@@ -55,7 +55,7 @@ The optional ```url``` argument, allows to specify a redis connection string suc
```redis://mypassword@myredis.server.com:1234```
```typescript
-interface QueueOpts{
+interface QueueOptions {
limiter?: RateLimiter;
redis?: RedisOpts;
prefix?: string = 'bull'; // prefix for all queue keys.
@@ -117,7 +117,7 @@ __Warning:__ Do not override these advanced settings unless you understand the i
* Consider these as overloaded functions. Since method overloading doesn't exist in javacript
* bull recognizes the desired function call by checking the parameters' types. Make sure you
* comply with one of the below defined patterns.
- *
+ *
* Note: Concurrency defaults to 1 if not specified.
*/
process(processor: (job, done?) => Promise | string)
diff --git a/lib/job.js b/lib/job.js
index 14f133558..ab1391db3 100644
--- a/lib/job.js
+++ b/lib/job.js
@@ -158,7 +158,7 @@ Job.prototype.releaseLock = function(){
var _this = this;
return scripts.releaseLock(this.queue, this.id).then(function(unlocked) {
if(unlocked != 1){
- throw Error('Could not release lock for job ' + _this.id);
+ throw new Error('Could not release lock for job ' + _this.id);
}
});
};
@@ -333,7 +333,7 @@ Job.prototype.remove = function(){
if(removed){
queue.emit('removed', job);
}else{
- throw Error('Could not remove job ' + job.id);
+ throw new Error('Could not remove job ' + job.id);
}
});
};
diff --git a/lib/process/child-pool.js b/lib/process/child-pool.js
index 2db45a6df..18091add9 100644
--- a/lib/process/child-pool.js
+++ b/lib/process/child-pool.js
@@ -11,17 +11,18 @@ module.exports = function ChildPool() {
}
this.retained = {};
- this.free = [];
+ this.free = {};
this.retain = Promise.method(function(processFile){
- var child = this.free.pop();
+ var child = this.getFree(processFile).pop();
if (child) {
return child;
}
child = fork(path.join(__dirname, './master.js'));
-
+ child.processFile = processFile;
+
this.retained[child.pid] = child;
child.on('exit', this.remove.bind(this, child));
@@ -37,14 +38,17 @@ module.exports = function ChildPool() {
this.release = function(child){
delete this.retained[child.pid];
- this.free.push(child);
+ this.getFree(child.processFile).push(child);
};
this.remove = function(child){
delete this.retained[child.pid];
- var childIndex = this.free.indexOf(child);
+
+ var free = this.getFree(child.processFile);
+
+ var childIndex = free.indexOf(child);
if (childIndex > -1) {
- this.free.splice(childIndex, 1);
+ free.splice(childIndex, 1);
}
};
@@ -54,13 +58,22 @@ module.exports = function ChildPool() {
};
this.clean = function(){
- var children = _.values(this.retained).concat(this.free);
+ var children = _.values(this.retained).concat(this.getAllFree());
var _this = this;
children.forEach(function(child){
- _this.kill(child, 0);
+ // TODO: We may want to use SIGKILL if the process does not die after some time.
+ _this.kill(child, 'SIGTERM');
});
this.retained = {};
- this.free = [];
+ this.free = {};
+ };
+
+ this.getFree = function(id) {
+ return this.free[id] = this.free[id] || [];
+ };
+
+ this.getAllFree = function(){
+ return _.flatten(_.values(this.free));
};
};
diff --git a/lib/queue.js b/lib/queue.js
index d2020f076..07079aa36 100755
--- a/lib/queue.js
+++ b/lib/queue.js
@@ -234,7 +234,9 @@ function redisClientGetter(queue, options, initCallback) {
return function() { // getter function
if (connections[type] != null) return connections[type];
var client = connections[type] = createClient(type, options.redis);
- queue.clients.push(client);
+ if(!options.createClient){
+ queue.clients.push(client);
+ }
return initCallback(type, client), client;
};
};
@@ -264,6 +266,7 @@ function setGuardianTimer(queue){
if(timestamp){
queue.updateDelayTimer(timestamp);
}
+ return null;
}).catch(function(err){
queue.emit('error', err);
}).return(null);
@@ -314,6 +317,7 @@ Queue.prototype._initProcess = function(){
if(timestamp){
_this.updateDelayTimer(timestamp);
}
+ return null;
}).return(null);
}).then(function(){
//
diff --git a/package.json b/package.json
index 41f34b6da..ad13fb49e 100644
--- a/package.json
+++ b/package.json
@@ -1,6 +1,6 @@
{
"name": "bull",
- "version": "3.3.7",
+ "version": "3.3.8",
"description": "Job manager",
"main": "./index.js",
"repository": {
diff --git a/test/fixtures/fixture_processor_bar.js b/test/fixtures/fixture_processor_bar.js
new file mode 100644
index 000000000..6e3b91902
--- /dev/null
+++ b/test/fixtures/fixture_processor_bar.js
@@ -0,0 +1,12 @@
+/**
+ * A processor file to be used in tests.
+ *
+ */
+
+var Promise = require('bluebird');
+
+module.exports = function(/*job*/){
+ return Promise.delay(500).then(function(){
+ return 'bar';
+ });
+};
diff --git a/test/fixtures/fixture_processor_foo.js b/test/fixtures/fixture_processor_foo.js
new file mode 100644
index 000000000..fe8f6d760
--- /dev/null
+++ b/test/fixtures/fixture_processor_foo.js
@@ -0,0 +1,12 @@
+/**
+ * A processor file to be used in tests.
+ *
+ */
+
+var Promise = require('bluebird');
+
+module.exports = function(/*job*/){
+ return Promise.delay(500).then(function(){
+ return 'foo';
+ });
+};
diff --git a/test/test_connection.js b/test/test_connection.js
index 12b4c8fa5..7f2fdbf33 100644
--- a/test/test_connection.js
+++ b/test/test_connection.js
@@ -71,4 +71,36 @@ describe('connection', function () {
});
});
+ it('should not close external connections', function () {
+
+ var client = new redis();
+ var subscriber = new redis();
+
+ var opts = {
+ createClient: function(type){
+ switch(type){
+ case 'client':
+ return client;
+ case 'subscriber':
+ return subscriber;
+ default:
+ return new redis();
+ }
+ }
+ };
+
+ var testQueue = utils.buildQueue('external connections', opts);
+
+ return testQueue.isReady().then(function(){
+ return testQueue.add({'foo': 'bar'});
+ }).then(function(){
+ expect(testQueue.client).to.be.eql(client);
+ expect(testQueue.eclient).to.be.eql(subscriber);
+
+ return testQueue.close();
+ }).then(function(){
+ expect(client.status).to.be.eql('ready');
+ expect(subscriber.status).to.be.eql('ready');
+ });
+ });
});
diff --git a/test/test_repeat.js b/test/test_repeat.js
index dba67e8f4..9cd6f5bfb 100644
--- a/test/test_repeat.js
+++ b/test/test_repeat.js
@@ -12,6 +12,7 @@ var ONE_SECOND = 1000;
var ONE_MINUTE = 60 * ONE_SECOND;
var ONE_HOUR = 60 * ONE_MINUTE;
var ONE_DAY = 24 * ONE_HOUR;
+var MAX_INT = 2147483647;
describe('repeat', function () {
var queue;
@@ -21,8 +22,8 @@ describe('repeat', function () {
var client = new redis();
return client.flushdb().then(function(){
queue = utils.buildQueue('repeat', {settings: {
- guardInterval: Number.MAX_VALUE,
- stalledInterval: Number.MAX_VALUE,
+ guardInterval: MAX_INT,
+ stalledInterval: MAX_INT,
drainDelay: 1 // Small delay so that .close is faster.
}});
return queue;
diff --git a/test/test_sandboxed_process.js b/test/test_sandboxed_process.js
index 0af0da99a..51401054e 100644
--- a/test/test_sandboxed_process.js
+++ b/test/test_sandboxed_process.js
@@ -14,8 +14,8 @@ describe('sandboxed process', function () {
var client = new redis();
return client.flushdb().then(function(){
queue = utils.buildQueue('test process', {settings: {
- guardInterval: Number.MAX_VALUE,
- stalledInterval: Number.MAX_VALUE
+ guardInterval: 300000,
+ stalledInterval: 300000,
}});
return queue;
});
@@ -29,14 +29,15 @@ describe('sandboxed process', function () {
});
it('should process and complete', function (done) {
- queue.process(__dirname + '/fixtures/fixture_processor.js');
+ var processFile = __dirname + '/fixtures/fixture_processor.js';
+ queue.process(processFile);
queue.on('completed', function(job, value){
try {
expect(job.data).to.be.eql({foo:'bar'});
expect(value).to.be.eql(42);
expect(Object.keys(queue.childPool.retained)).to.have.lengthOf(0);
- expect(queue.childPool.free).to.have.lengthOf(1);
+ expect(queue.childPool.free[processFile]).to.have.lengthOf(1);
done();
} catch (err) {
done(err);
@@ -47,14 +48,15 @@ describe('sandboxed process', function () {
});
it('should process with named processor', function (done) {
- queue.process('foobar', __dirname + '/fixtures/fixture_processor.js');
+ var processFile = __dirname + '/fixtures/fixture_processor.js';
+ queue.process('foobar', processFile);
queue.on('completed', function(job, value){
try {
expect(job.data).to.be.eql({foo:'bar'});
expect(value).to.be.eql(42);
expect(Object.keys(queue.childPool.retained)).to.have.lengthOf(0);
- expect(queue.childPool.free).to.have.lengthOf(1);
+ expect(queue.childPool.free[processFile]).to.have.lengthOf(1);
done();
} catch (err) {
done(err);
@@ -64,15 +66,63 @@ describe('sandboxed process', function () {
queue.add('foobar', {foo:'bar'});
});
+ it('should process with several named processors', function (done) {
+ var processFileFoo = __dirname + '/fixtures/fixture_processor_foo.js';
+ var processFileBar = __dirname + '/fixtures/fixture_processor_bar.js';
+
+ queue.process('foo', processFileFoo);
+ queue.process('bar', processFileBar);
+
+ var count = 0;
+ queue.on('completed', function(job, value){
+ var data, result, processFile, retainedLength;
+ count++;
+ if(count == 1){
+ data = {foo: 'bar'};
+ result = 'foo';
+ processFile = processFileFoo;
+ retainedLength = 1;
+ }else {
+ data = {bar: 'qux'};
+ result = 'bar';
+ processFile = processFileBar;
+ retainedLength = 0;
+ }
+
+ try {
+ expect(job.data).to.be.eql(data);
+ expect(value).to.be.eql(result);
+ expect(Object.keys(queue.childPool.retained)).to.have.lengthOf(retainedLength);
+ expect(queue.childPool.free[processFile]).to.have.lengthOf(1);
+ if(count === 2){
+ done();
+ }
+ } catch (err) {
+ console.error(err);
+ done(err);
+ }
+ });
+
+ queue.add('foo', {foo: 'bar'}).then(function(){
+ Promise.delay(500).then(function(){
+ queue.add('bar', {bar: 'qux'});
+ });
+ });
+
+ queue.on('error', function(err){
+ console.error(err);
+ });
+ });
+
it('should process with concurrent processors', function (done) {
var after = _.after(4, function(){
- expect(queue.childPool.free.length).to.eql(4);
+ expect(queue.childPool.getAllFree().length).to.eql(4);
done();
});
queue.on('completed', function(job, value){
try {
expect(value).to.be.eql(42);
- expect(Object.keys(queue.childPool.retained).length + queue.childPool.free.length).to.eql(4);
+ expect(Object.keys(queue.childPool.retained).length + queue.childPool.getAllFree().length).to.eql(4);
after();
} catch (err) {
done(err);
@@ -97,7 +147,7 @@ describe('sandboxed process', function () {
expect(job.data).to.be.eql({foo:'bar'});
expect(value).to.be.eql(42);
expect(Object.keys(queue.childPool.retained)).to.have.lengthOf(0);
- expect(queue.childPool.free).to.have.lengthOf(1);
+ expect(queue.childPool.getAllFree()).to.have.lengthOf(1);
done();
} catch (err) {
done(err);
@@ -117,7 +167,7 @@ describe('sandboxed process', function () {
expect(job.progress()).to.be.eql(100);
expect(progresses).to.be.eql([10, 27, 78, 100]);
expect(Object.keys(queue.childPool.retained)).to.have.lengthOf(0);
- expect(queue.childPool.free).to.have.lengthOf(1);
+ expect(queue.childPool.getAllFree()).to.have.lengthOf(1);
done();
} catch (err) {
done(err);
@@ -141,7 +191,7 @@ describe('sandboxed process', function () {
expect(job.failedReason).eql('Manually failed processor');
expect(err.message).eql('Manually failed processor');
expect(Object.keys(queue.childPool.retained)).to.have.lengthOf(0);
- expect(queue.childPool.free).to.have.lengthOf(1);
+ expect(queue.childPool.getAllFree()).to.have.lengthOf(1);
done();
} catch (err) {
done(err);
@@ -160,7 +210,7 @@ describe('sandboxed process', function () {
expect(job.failedReason).eql('Manually failed processor');
expect(err.message).eql('Manually failed processor');
expect(Object.keys(queue.childPool.retained)).to.have.lengthOf(0);
- expect(queue.childPool.free).to.have.lengthOf(1);
+ expect(queue.childPool.getAllFree()).to.have.lengthOf(1);
done();
} catch (err) {
done(err);
@@ -176,12 +226,11 @@ describe('sandboxed process', function () {
queue.on('completed', function(){
try {
expect(Object.keys(queue.childPool.retained)).to.have.lengthOf(0);
- expect(queue.childPool.free).to.have.lengthOf(1);
+ expect(queue.childPool.getAllFree()).to.have.lengthOf(1);
Promise.delay(500).then(function(){
expect(Object.keys(queue.childPool.retained)).to.have.lengthOf(0);
- expect(queue.childPool.free).to.have.lengthOf(0);
- })
- .asCallback(done);
+ expect(queue.childPool.getAllFree()).to.have.lengthOf(0);
+ }).asCallback(done);
} catch (err) {
done(err);
}
diff --git a/test/test_worker.js b/test/test_worker.js
index b87c77e7c..0975397de 100644
--- a/test/test_worker.js
+++ b/test/test_worker.js
@@ -12,8 +12,8 @@ describe('workers', function () {
var client = new redis();
return client.flushdb().then(function(){
queue = utils.buildQueue('test workers', {settings: {
- guardInterval: Number.MAX_VALUE,
- stalledInterval: Number.MAX_VALUE
+ guardInterval: 300000,
+ stalledInterval: 300000
}});
return queue;
});