Skip to content

Commit

Permalink
Merge pull request bitpay#1882 from matiu/ref/locker
Browse files Browse the repository at this point in the history
Ref/locker
  • Loading branch information
matiu authored Jan 10, 2019
2 parents ef7a5f0 + a7bcdcf commit e9eec4a
Show file tree
Hide file tree
Showing 19 changed files with 415 additions and 273 deletions.
4 changes: 2 additions & 2 deletions packages/bitcore-wallet-client/lib/api.js
Original file line number Diff line number Diff line change
Expand Up @@ -2534,8 +2534,8 @@ API.prototype.getSendMaxInfo = function(opts, cb) {
* Get wallet status based on a string identifier (one of: walletId, address, txid)
*
* @param {string} opts.identifier - The identifier
* @param {Boolean} opts.twoStep[=false] - Optional: use 2-step balance computation for improved performance
* @param {Boolean} opts.includeExtendedInfo (optional: query extended status)
* @param {Boolean} opts.walletCheck (optional: run v8 walletCheck if wallet found)
* @returns {Callback} cb - Returns error or an object with status information
*/
API.prototype.getStatusByIdentifier = function(opts, cb) {
Expand All @@ -2546,7 +2546,7 @@ API.prototype.getStatusByIdentifier = function(opts, cb) {

var qs = [];
qs.push('includeExtendedInfo=' + (opts.includeExtendedInfo ? '1' : '0'));
qs.push('twoStep=' + (opts.twoStep ? '1' : '0'));
qs.push('walletCheck=' + (opts.walletCheck ? '1' : '0'));

self._doGetRequest('/v1/wallets/' + opts.identifier + '?' + qs.join('&'), function(err, result) {
if (err || !result || !result.wallet) return cb(err);
Expand Down
7 changes: 0 additions & 7 deletions packages/bitcore-wallet-service/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,6 @@ var config = {
uri: 'mongodb://localhost:27017/bws',
},
},
lockOpts: {
// To use locker-server, uncomment this:
lockerServer: {
host: 'localhost',
port: 3231,
},
},
messageBrokerOpts: {
// To use message broker server, uncomment this:
messageBrokerServer: {
Expand Down
33 changes: 12 additions & 21 deletions packages/bitcore-wallet-service/installation.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ npm start
### Configuration
Configuration for all required modules can be specified in https://github.com/bitpay/bitcore-wallet-service/blob/master/config.js

BWS is composed of 5 separate node services -
Locker - locker/locker.js
BWS is composed of 4 separate node services -
Message Broker - messagebroker/messagebroker.js
Blockchain Monitor - bcmonitor/bcmonitor.js (This service talks to the Blockchain Explorer service configured under blockchainExplorerOpts - see Configure blockchain service below.)
Email Service - emailservice/emailservice.js
Expand All @@ -45,16 +44,6 @@ Example configuration for connecting to the MongoDB instance:
},
}
```
#### Configure Locker service
Example configuration for connecting to locker service:
```javascript
lockOpts: {
lockerServer: {
host: 'localhost',
port: 3231,
},
}
```

#### Configure Message Broker service
Example configuration for connecting to message broker service:
Expand All @@ -66,19 +55,21 @@ Example configuration for connecting to message broker service:
}
```

#### Configure blockchain service
#### Configure blockchain service. Bitcore v8 is required.
Note: this service will be used by blockchain monitor service as well as by BWS itself.
An example of this configuration is:
```javascript
blockchainExplorerOpts: {
livenet: {
provider: 'insight',
url: 'https://insight.bitpay.com:443',
},
testnet: {
provider: 'insight',
url: 'https://test-insight.bitpay.com:443',
},
'btc': {
livenet: {
provider: 'v8',
url: 'https://insight.bitpay.com:443',
},
testnet: {
provider: 'v8',
url: 'https://test-insight.bitpay.com:443',
},
},
}
```

Expand Down
5 changes: 3 additions & 2 deletions packages/bitcore-wallet-service/lib/emailservice.js
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ EmailService.prototype.start = function(opts, cb) {
done();
},
function(done) {
self.lock = opts.lock || new Lock(opts.lockOpts);
self.lock = opts.lock || new Lock(self.storage, opts.lockOpts);
done();
},
function(done) {
Expand Down Expand Up @@ -233,6 +233,7 @@ EmailService.prototype._getDataForTemplate = function(notification, recipient, c
return cb(new Error('Could not format amount', ex));
}
}

self.storage.fetchWallet(notification.walletId, function(err, wallet) {
if (err) return cb(err);
data.walletId = wallet.id;
Expand Down Expand Up @@ -355,7 +356,7 @@ EmailService.prototype.sendEmail = function(notification, cb) {
// TODO: Optimize so one process does not have to wait until all others are done
// Instead set a flag somewhere in the db to indicate that this process is free
// to serve another request.
self.lock.runLocked('email-' + notification.id, cb, function(cb) {
self.lock.runLocked('email-' + notification.id, {}, cb, function(cb) {
self.storage.fetchEmailByNotification(notification.id, function(err, email) {
if (err) return cb(err);
if (email) return cb();
Expand Down
2 changes: 1 addition & 1 deletion packages/bitcore-wallet-service/lib/expressapp.js
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ ExpressApp.prototype.start = function(opts, cb) {
}, function(server) {
var opts = {
identifier: req.params['identifier'],
walletCheck: req.params['walletCheck'],
};
server.getWalletFromIdentifier(opts, function(err, wallet) {
if (err) return returnError(err, res, req);
Expand All @@ -319,7 +320,6 @@ ExpressApp.prototype.start = function(opts, cb) {
server.walletId = wallet.id;
var opts = {};
if (req.query.includeExtendedInfo == '1') opts.includeExtendedInfo = true;
if (req.query.twoStep == '1') opts.twoStep = true;
server.getStatus(opts, function(err, status) {
if (err) return returnError(err, res, req);
res.json(status);
Expand Down
63 changes: 0 additions & 63 deletions packages/bitcore-wallet-service/lib/locallock.js

This file was deleted.

93 changes: 71 additions & 22 deletions packages/bitcore-wallet-service/lib/lock.js
Original file line number Diff line number Diff line change
@@ -1,47 +1,96 @@
var $ = require('preconditions').singleton();
const $ = require('preconditions').singleton();
var _ = require('lodash');
var log = require('npmlog');
log.debug = log.verbose;
log.disableColor();

var LocalLock = require('./locallock');
var RemoteLock = require('locker');
var Common = require('./common');
var Defaults = Common.Defaults;

var Errors = require('./errors/errordefinitions');

function Lock(opts) {

const ACQUIRE_RETRY_STEP = 50; //ms

function Lock(storage, opts) {
opts = opts || {};
if (opts.lockerServer) {
this.lock = new RemoteLock(opts.lockerServer.port, opts.lockerServer.host);

log.info('Using locker server:' + opts.lockerServer.host + ':' + opts.lockerServer.port);

this.lock.on('reset', function() {
log.debug('Locker server reset');
});
this.lock.on('error', function(error) {
log.error('Locker server threw error', error);
});
} else {
this.lock = new LocalLock();
}

this.storage = storage;
};

Lock.prototype.runLocked = function(token, cb, task, waitTime) {

Lock.prototype.acquire = function(token, opts, cb, timeLeft) {
var self = this;
opts = opts || {};

opts.lockTime = opts.lockTime || Defaults.LOCK_EXE_TIME;

this.storage.acquireLock(token, (err) => {

// Lock taken?
if(err && err.message && err.message.indexOf('E11000 ') !== -1) {

// Waiting time for lock has expired
if (timeLeft < 0) {
return cb('LOCKED');
}


if (!_.isUndefined(opts.waitTime)) {
if (_.isUndefined(timeLeft)) {
timeLeft = opts.waitTime;
} else {
timeLeft -= ACQUIRE_RETRY_STEP;
}
}

return setTimeout(self.acquire.bind(self,token, opts, cb, timeLeft), ACQUIRE_RETRY_STEP);

// Actual DB error
} else if (err) {
return cb(err);

// Lock available
} else {

var lockTimerId;

function release(icb) {
if (!icb) icb = () => {};

clearTimeout(lockTimerId);

self.storage.releaseLock(token, icb);
}


lockTimerId = setTimeout(() => {
release();
}, opts.lockTime);

return cb(null, (icb) => {
release(icb);
});
}
});
};

Lock.prototype.runLocked = function(token, opts, cb, task) {
$.shouldBeDefined(token);

waitTime = waitTime || Defaults.LOCK_WAIT_TIME;
this.acquire(token, opts, function(err, release) {
if (err == 'LOCKED' ) return cb(Errors.WALLET_BUSY);
if (err) return cb(err);


this.lock.locked(token, waitTime , Defaults.LOCK_EXE_TIME, function(err, release) {
if (err) return cb(Errors.WALLET_BUSY);
var _cb = function() {

cb.apply(null, arguments);
release();
};
task(_cb);
});
};


module.exports = Lock;
20 changes: 16 additions & 4 deletions packages/bitcore-wallet-service/lib/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ WalletService.initialize = function(opts, cb) {
$.shouldBeFunction(cb);

opts = opts || {};
lock = opts.lock || new Lock(opts.lockOpts);
blockchainExplorer = opts.blockchainExplorer;
blockchainExplorerOpts = opts.blockchainExplorerOpts;

Expand Down Expand Up @@ -164,6 +163,8 @@ WalletService.initialize = function(opts, cb) {
initFiatRateService(next);
},
], function(err) {
lock = opts.lock || new Lock(storage, opts.lockOpts);

if (err) {
log.error('Could not initialize', err);
throw err;
Expand Down Expand Up @@ -319,7 +320,7 @@ WalletService.getInstanceWithAuth = function(opts, cb) {
WalletService.prototype._runLocked = function(cb, task, waitTime) {
$.checkState(this.walletId);

this.lock.runLocked(this.walletId, cb, task, waitTime);
this.lock.runLocked(this.walletId, {waitTime: waitTime}, cb, task);
};


Expand Down Expand Up @@ -509,13 +510,23 @@ WalletService.prototype.getWallet = function(opts, cb) {
* Retrieves a wallet from storage.
* @param {Object} opts
* @param {string} opts.identifier - The identifier associated with the wallet (one of: walletId, address, txid).
* @param {string} opts.walletCheck - Check v8 wallet sync
* @returns {Object} wallet
*/
WalletService.prototype.getWalletFromIdentifier = function(opts, cb) {
var self = this;

if (!opts.identifier) return cb();


function end(err, ret) {
if (opts.walletCheck && !err && ret) {
return self.syncWallet(ret, cb);
} else {
return cb(err, ret);
}
}

var walletId;
async.parallel([

Expand All @@ -540,7 +551,7 @@ WalletService.prototype.getWalletFromIdentifier = function(opts, cb) {
], function(err) {
if (err) return cb(err);
if (walletId) {
return self.storage.fetchWallet(walletId, cb);
return self.storage.fetchWallet(walletId, end);
}

var re = /^[\da-f]+$/gi;
Expand Down Expand Up @@ -575,7 +586,7 @@ WalletService.prototype.getWalletFromIdentifier = function(opts, cb) {
});
}, function() {
if (!walletId) return cb();
return self.storage.fetchWallet(walletId, cb);
return self.storage.fetchWallet(walletId, end);
});
});
};
Expand Down Expand Up @@ -2513,6 +2524,7 @@ WalletService.prototype.publishTx = function(opts, cb) {
if (!checkRequired(opts, ['txProposalId', 'proposalSignature'], cb)) return;

self._runLocked(cb, function(cb) {

self.getWallet({}, function(err, wallet) {
if (err) return cb(err);

Expand Down
Loading

0 comments on commit e9eec4a

Please sign in to comment.