forked from bitpay/bitcore
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request bitpay#1882 from matiu/ref/locker
Ref/locker
- Loading branch information
Showing
19 changed files
with
415 additions
and
273 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,47 +1,96 @@ | ||
var $ = require('preconditions').singleton(); | ||
const $ = require('preconditions').singleton(); | ||
var _ = require('lodash'); | ||
var log = require('npmlog'); | ||
log.debug = log.verbose; | ||
log.disableColor(); | ||
|
||
var LocalLock = require('./locallock'); | ||
var RemoteLock = require('locker'); | ||
var Common = require('./common'); | ||
var Defaults = Common.Defaults; | ||
|
||
var Errors = require('./errors/errordefinitions'); | ||
|
||
function Lock(opts) { | ||
|
||
const ACQUIRE_RETRY_STEP = 50; //ms | ||
|
||
function Lock(storage, opts) { | ||
opts = opts || {}; | ||
if (opts.lockerServer) { | ||
this.lock = new RemoteLock(opts.lockerServer.port, opts.lockerServer.host); | ||
|
||
log.info('Using locker server:' + opts.lockerServer.host + ':' + opts.lockerServer.port); | ||
|
||
this.lock.on('reset', function() { | ||
log.debug('Locker server reset'); | ||
}); | ||
this.lock.on('error', function(error) { | ||
log.error('Locker server threw error', error); | ||
}); | ||
} else { | ||
this.lock = new LocalLock(); | ||
} | ||
|
||
this.storage = storage; | ||
}; | ||
|
||
Lock.prototype.runLocked = function(token, cb, task, waitTime) { | ||
|
||
Lock.prototype.acquire = function(token, opts, cb, timeLeft) { | ||
var self = this; | ||
opts = opts || {}; | ||
|
||
opts.lockTime = opts.lockTime || Defaults.LOCK_EXE_TIME; | ||
|
||
this.storage.acquireLock(token, (err) => { | ||
|
||
// Lock taken? | ||
if(err && err.message && err.message.indexOf('E11000 ') !== -1) { | ||
|
||
// Waiting time for lock has expired | ||
if (timeLeft < 0) { | ||
return cb('LOCKED'); | ||
} | ||
|
||
|
||
if (!_.isUndefined(opts.waitTime)) { | ||
if (_.isUndefined(timeLeft)) { | ||
timeLeft = opts.waitTime; | ||
} else { | ||
timeLeft -= ACQUIRE_RETRY_STEP; | ||
} | ||
} | ||
|
||
return setTimeout(self.acquire.bind(self,token, opts, cb, timeLeft), ACQUIRE_RETRY_STEP); | ||
|
||
// Actual DB error | ||
} else if (err) { | ||
return cb(err); | ||
|
||
// Lock available | ||
} else { | ||
|
||
var lockTimerId; | ||
|
||
function release(icb) { | ||
if (!icb) icb = () => {}; | ||
|
||
clearTimeout(lockTimerId); | ||
|
||
self.storage.releaseLock(token, icb); | ||
} | ||
|
||
|
||
lockTimerId = setTimeout(() => { | ||
release(); | ||
}, opts.lockTime); | ||
|
||
return cb(null, (icb) => { | ||
release(icb); | ||
}); | ||
} | ||
}); | ||
}; | ||
|
||
Lock.prototype.runLocked = function(token, opts, cb, task) { | ||
$.shouldBeDefined(token); | ||
|
||
waitTime = waitTime || Defaults.LOCK_WAIT_TIME; | ||
this.acquire(token, opts, function(err, release) { | ||
if (err == 'LOCKED' ) return cb(Errors.WALLET_BUSY); | ||
if (err) return cb(err); | ||
|
||
|
||
this.lock.locked(token, waitTime , Defaults.LOCK_EXE_TIME, function(err, release) { | ||
if (err) return cb(Errors.WALLET_BUSY); | ||
var _cb = function() { | ||
|
||
cb.apply(null, arguments); | ||
release(); | ||
}; | ||
task(_cb); | ||
}); | ||
}; | ||
|
||
|
||
module.exports = Lock; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.