diff --git a/.github/issue_template.md b/.github/issue_template.md new file mode 100644 index 000000000..8057186f8 --- /dev/null +++ b/.github/issue_template.md @@ -0,0 +1,14 @@ + + +## Description + +## Test code to reproduce + +## Bull version + +## Additional information + diff --git a/CHANGELOG.md b/CHANGELOG.md index d053d6027..8a90efa3c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,10 @@ +v.3.3.8 +======= + +- Fixed #812. External process doesn't terminate on `queue.close()`. +- Fixed #830. Named Process Sent to Wrong Processor. +- Fixed #572. Do not close external connections. + v.3.3.7 ======= diff --git a/PATTERNS.md b/PATTERNS.md index 728ed2b3d..0f0b39f21 100644 --- a/PATTERNS.md +++ b/PATTERNS.md @@ -67,8 +67,11 @@ Reusing Redis Connections A standard queue requires **3 connections** to the Redis server. In some situations you might want to re-use connections—for example on Heroku where the connection count is restricted. You can do this with the `createClient` option in the `Queue` constructor: ```js -var client = new redis(); -var subscriber = new redis(); +var {REDIS_URL} = process.env + +var Redis = require('ioredis') +var client = new Redis(REDIS_URL); +var subscriber = new Redis(REDIS_URL); var opts = { createClient: function(type){ @@ -78,7 +81,7 @@ var opts = { case 'subscriber': return subscriber; default: - return new redis(); + return new Redis(); } } } diff --git a/README.md b/README.md index 3265584a8..dc40acd06 100644 --- a/README.md +++ b/README.md @@ -60,6 +60,9 @@ + + Taskforce.sh, Inc + Are you developing bull sponsored by a company? Please, let us now! @@ -95,6 +98,7 @@ There are a few third-party UIs that you can use for monitoring: **Bull v3** +- [NEW (Preview) Taskforce](https://taskforce.sh) - [Arena](https://github.com/mixmaxhq/arena) **Bull <= v2** diff --git a/REFERENCE.md b/REFERENCE.md index cb0087722..4bf9fb1d4 100644 --- a/REFERENCE.md +++ b/REFERENCE.md @@ -55,7 +55,7 @@ The optional ```url``` argument, allows to specify a redis connection string suc ```redis://mypassword@myredis.server.com:1234``` ```typescript -interface QueueOpts{ +interface QueueOptions { limiter?: RateLimiter; redis?: RedisOpts; prefix?: string = 'bull'; // prefix for all queue keys. @@ -117,7 +117,7 @@ __Warning:__ Do not override these advanced settings unless you understand the i * Consider these as overloaded functions. Since method overloading doesn't exist in javacript * bull recognizes the desired function call by checking the parameters' types. Make sure you * comply with one of the below defined patterns. - * + * * Note: Concurrency defaults to 1 if not specified. */ process(processor: (job, done?) => Promise | string) diff --git a/lib/job.js b/lib/job.js index 14f133558..ab1391db3 100644 --- a/lib/job.js +++ b/lib/job.js @@ -158,7 +158,7 @@ Job.prototype.releaseLock = function(){ var _this = this; return scripts.releaseLock(this.queue, this.id).then(function(unlocked) { if(unlocked != 1){ - throw Error('Could not release lock for job ' + _this.id); + throw new Error('Could not release lock for job ' + _this.id); } }); }; @@ -333,7 +333,7 @@ Job.prototype.remove = function(){ if(removed){ queue.emit('removed', job); }else{ - throw Error('Could not remove job ' + job.id); + throw new Error('Could not remove job ' + job.id); } }); }; diff --git a/lib/process/child-pool.js b/lib/process/child-pool.js index 2db45a6df..18091add9 100644 --- a/lib/process/child-pool.js +++ b/lib/process/child-pool.js @@ -11,17 +11,18 @@ module.exports = function ChildPool() { } this.retained = {}; - this.free = []; + this.free = {}; this.retain = Promise.method(function(processFile){ - var child = this.free.pop(); + var child = this.getFree(processFile).pop(); if (child) { return child; } child = fork(path.join(__dirname, './master.js')); - + child.processFile = processFile; + this.retained[child.pid] = child; child.on('exit', this.remove.bind(this, child)); @@ -37,14 +38,17 @@ module.exports = function ChildPool() { this.release = function(child){ delete this.retained[child.pid]; - this.free.push(child); + this.getFree(child.processFile).push(child); }; this.remove = function(child){ delete this.retained[child.pid]; - var childIndex = this.free.indexOf(child); + + var free = this.getFree(child.processFile); + + var childIndex = free.indexOf(child); if (childIndex > -1) { - this.free.splice(childIndex, 1); + free.splice(childIndex, 1); } }; @@ -54,13 +58,22 @@ module.exports = function ChildPool() { }; this.clean = function(){ - var children = _.values(this.retained).concat(this.free); + var children = _.values(this.retained).concat(this.getAllFree()); var _this = this; children.forEach(function(child){ - _this.kill(child, 0); + // TODO: We may want to use SIGKILL if the process does not die after some time. + _this.kill(child, 'SIGTERM'); }); this.retained = {}; - this.free = []; + this.free = {}; + }; + + this.getFree = function(id) { + return this.free[id] = this.free[id] || []; + }; + + this.getAllFree = function(){ + return _.flatten(_.values(this.free)); }; }; diff --git a/lib/queue.js b/lib/queue.js index d2020f076..07079aa36 100755 --- a/lib/queue.js +++ b/lib/queue.js @@ -234,7 +234,9 @@ function redisClientGetter(queue, options, initCallback) { return function() { // getter function if (connections[type] != null) return connections[type]; var client = connections[type] = createClient(type, options.redis); - queue.clients.push(client); + if(!options.createClient){ + queue.clients.push(client); + } return initCallback(type, client), client; }; }; @@ -264,6 +266,7 @@ function setGuardianTimer(queue){ if(timestamp){ queue.updateDelayTimer(timestamp); } + return null; }).catch(function(err){ queue.emit('error', err); }).return(null); @@ -314,6 +317,7 @@ Queue.prototype._initProcess = function(){ if(timestamp){ _this.updateDelayTimer(timestamp); } + return null; }).return(null); }).then(function(){ // diff --git a/package.json b/package.json index 41f34b6da..ad13fb49e 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "bull", - "version": "3.3.7", + "version": "3.3.8", "description": "Job manager", "main": "./index.js", "repository": { diff --git a/test/fixtures/fixture_processor_bar.js b/test/fixtures/fixture_processor_bar.js new file mode 100644 index 000000000..6e3b91902 --- /dev/null +++ b/test/fixtures/fixture_processor_bar.js @@ -0,0 +1,12 @@ +/** + * A processor file to be used in tests. + * + */ + +var Promise = require('bluebird'); + +module.exports = function(/*job*/){ + return Promise.delay(500).then(function(){ + return 'bar'; + }); +}; diff --git a/test/fixtures/fixture_processor_foo.js b/test/fixtures/fixture_processor_foo.js new file mode 100644 index 000000000..fe8f6d760 --- /dev/null +++ b/test/fixtures/fixture_processor_foo.js @@ -0,0 +1,12 @@ +/** + * A processor file to be used in tests. + * + */ + +var Promise = require('bluebird'); + +module.exports = function(/*job*/){ + return Promise.delay(500).then(function(){ + return 'foo'; + }); +}; diff --git a/test/test_connection.js b/test/test_connection.js index 12b4c8fa5..7f2fdbf33 100644 --- a/test/test_connection.js +++ b/test/test_connection.js @@ -71,4 +71,36 @@ describe('connection', function () { }); }); + it('should not close external connections', function () { + + var client = new redis(); + var subscriber = new redis(); + + var opts = { + createClient: function(type){ + switch(type){ + case 'client': + return client; + case 'subscriber': + return subscriber; + default: + return new redis(); + } + } + }; + + var testQueue = utils.buildQueue('external connections', opts); + + return testQueue.isReady().then(function(){ + return testQueue.add({'foo': 'bar'}); + }).then(function(){ + expect(testQueue.client).to.be.eql(client); + expect(testQueue.eclient).to.be.eql(subscriber); + + return testQueue.close(); + }).then(function(){ + expect(client.status).to.be.eql('ready'); + expect(subscriber.status).to.be.eql('ready'); + }); + }); }); diff --git a/test/test_repeat.js b/test/test_repeat.js index dba67e8f4..9cd6f5bfb 100644 --- a/test/test_repeat.js +++ b/test/test_repeat.js @@ -12,6 +12,7 @@ var ONE_SECOND = 1000; var ONE_MINUTE = 60 * ONE_SECOND; var ONE_HOUR = 60 * ONE_MINUTE; var ONE_DAY = 24 * ONE_HOUR; +var MAX_INT = 2147483647; describe('repeat', function () { var queue; @@ -21,8 +22,8 @@ describe('repeat', function () { var client = new redis(); return client.flushdb().then(function(){ queue = utils.buildQueue('repeat', {settings: { - guardInterval: Number.MAX_VALUE, - stalledInterval: Number.MAX_VALUE, + guardInterval: MAX_INT, + stalledInterval: MAX_INT, drainDelay: 1 // Small delay so that .close is faster. }}); return queue; diff --git a/test/test_sandboxed_process.js b/test/test_sandboxed_process.js index 0af0da99a..51401054e 100644 --- a/test/test_sandboxed_process.js +++ b/test/test_sandboxed_process.js @@ -14,8 +14,8 @@ describe('sandboxed process', function () { var client = new redis(); return client.flushdb().then(function(){ queue = utils.buildQueue('test process', {settings: { - guardInterval: Number.MAX_VALUE, - stalledInterval: Number.MAX_VALUE + guardInterval: 300000, + stalledInterval: 300000, }}); return queue; }); @@ -29,14 +29,15 @@ describe('sandboxed process', function () { }); it('should process and complete', function (done) { - queue.process(__dirname + '/fixtures/fixture_processor.js'); + var processFile = __dirname + '/fixtures/fixture_processor.js'; + queue.process(processFile); queue.on('completed', function(job, value){ try { expect(job.data).to.be.eql({foo:'bar'}); expect(value).to.be.eql(42); expect(Object.keys(queue.childPool.retained)).to.have.lengthOf(0); - expect(queue.childPool.free).to.have.lengthOf(1); + expect(queue.childPool.free[processFile]).to.have.lengthOf(1); done(); } catch (err) { done(err); @@ -47,14 +48,15 @@ describe('sandboxed process', function () { }); it('should process with named processor', function (done) { - queue.process('foobar', __dirname + '/fixtures/fixture_processor.js'); + var processFile = __dirname + '/fixtures/fixture_processor.js'; + queue.process('foobar', processFile); queue.on('completed', function(job, value){ try { expect(job.data).to.be.eql({foo:'bar'}); expect(value).to.be.eql(42); expect(Object.keys(queue.childPool.retained)).to.have.lengthOf(0); - expect(queue.childPool.free).to.have.lengthOf(1); + expect(queue.childPool.free[processFile]).to.have.lengthOf(1); done(); } catch (err) { done(err); @@ -64,15 +66,63 @@ describe('sandboxed process', function () { queue.add('foobar', {foo:'bar'}); }); + it('should process with several named processors', function (done) { + var processFileFoo = __dirname + '/fixtures/fixture_processor_foo.js'; + var processFileBar = __dirname + '/fixtures/fixture_processor_bar.js'; + + queue.process('foo', processFileFoo); + queue.process('bar', processFileBar); + + var count = 0; + queue.on('completed', function(job, value){ + var data, result, processFile, retainedLength; + count++; + if(count == 1){ + data = {foo: 'bar'}; + result = 'foo'; + processFile = processFileFoo; + retainedLength = 1; + }else { + data = {bar: 'qux'}; + result = 'bar'; + processFile = processFileBar; + retainedLength = 0; + } + + try { + expect(job.data).to.be.eql(data); + expect(value).to.be.eql(result); + expect(Object.keys(queue.childPool.retained)).to.have.lengthOf(retainedLength); + expect(queue.childPool.free[processFile]).to.have.lengthOf(1); + if(count === 2){ + done(); + } + } catch (err) { + console.error(err); + done(err); + } + }); + + queue.add('foo', {foo: 'bar'}).then(function(){ + Promise.delay(500).then(function(){ + queue.add('bar', {bar: 'qux'}); + }); + }); + + queue.on('error', function(err){ + console.error(err); + }); + }); + it('should process with concurrent processors', function (done) { var after = _.after(4, function(){ - expect(queue.childPool.free.length).to.eql(4); + expect(queue.childPool.getAllFree().length).to.eql(4); done(); }); queue.on('completed', function(job, value){ try { expect(value).to.be.eql(42); - expect(Object.keys(queue.childPool.retained).length + queue.childPool.free.length).to.eql(4); + expect(Object.keys(queue.childPool.retained).length + queue.childPool.getAllFree().length).to.eql(4); after(); } catch (err) { done(err); @@ -97,7 +147,7 @@ describe('sandboxed process', function () { expect(job.data).to.be.eql({foo:'bar'}); expect(value).to.be.eql(42); expect(Object.keys(queue.childPool.retained)).to.have.lengthOf(0); - expect(queue.childPool.free).to.have.lengthOf(1); + expect(queue.childPool.getAllFree()).to.have.lengthOf(1); done(); } catch (err) { done(err); @@ -117,7 +167,7 @@ describe('sandboxed process', function () { expect(job.progress()).to.be.eql(100); expect(progresses).to.be.eql([10, 27, 78, 100]); expect(Object.keys(queue.childPool.retained)).to.have.lengthOf(0); - expect(queue.childPool.free).to.have.lengthOf(1); + expect(queue.childPool.getAllFree()).to.have.lengthOf(1); done(); } catch (err) { done(err); @@ -141,7 +191,7 @@ describe('sandboxed process', function () { expect(job.failedReason).eql('Manually failed processor'); expect(err.message).eql('Manually failed processor'); expect(Object.keys(queue.childPool.retained)).to.have.lengthOf(0); - expect(queue.childPool.free).to.have.lengthOf(1); + expect(queue.childPool.getAllFree()).to.have.lengthOf(1); done(); } catch (err) { done(err); @@ -160,7 +210,7 @@ describe('sandboxed process', function () { expect(job.failedReason).eql('Manually failed processor'); expect(err.message).eql('Manually failed processor'); expect(Object.keys(queue.childPool.retained)).to.have.lengthOf(0); - expect(queue.childPool.free).to.have.lengthOf(1); + expect(queue.childPool.getAllFree()).to.have.lengthOf(1); done(); } catch (err) { done(err); @@ -176,12 +226,11 @@ describe('sandboxed process', function () { queue.on('completed', function(){ try { expect(Object.keys(queue.childPool.retained)).to.have.lengthOf(0); - expect(queue.childPool.free).to.have.lengthOf(1); + expect(queue.childPool.getAllFree()).to.have.lengthOf(1); Promise.delay(500).then(function(){ expect(Object.keys(queue.childPool.retained)).to.have.lengthOf(0); - expect(queue.childPool.free).to.have.lengthOf(0); - }) - .asCallback(done); + expect(queue.childPool.getAllFree()).to.have.lengthOf(0); + }).asCallback(done); } catch (err) { done(err); } diff --git a/test/test_worker.js b/test/test_worker.js index b87c77e7c..0975397de 100644 --- a/test/test_worker.js +++ b/test/test_worker.js @@ -12,8 +12,8 @@ describe('workers', function () { var client = new redis(); return client.flushdb().then(function(){ queue = utils.buildQueue('test workers', {settings: { - guardInterval: Number.MAX_VALUE, - stalledInterval: Number.MAX_VALUE + guardInterval: 300000, + stalledInterval: 300000 }}); return queue; });