Skip to content

Commit

Permalink
MongoDB support
Browse files Browse the repository at this point in the history
  • Loading branch information
jmealo committed Oct 8, 2015
1 parent 57bf531 commit 1147cda
Show file tree
Hide file tree
Showing 7 changed files with 212 additions and 50 deletions.
1 change: 0 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ before_script:
- ${PWD}/mongodb-linux-x86_64-3.0.4/bin/mongo lapidus --eval 'printjson(rs.initiate());'
- sleep 20
- ${PWD}/mongodb-linux-x86_64-3.0.4/bin/mongo lapidus --eval 'db.getMongo().setSlaveOk();'
- ${PWD}/mongodb-linux-x86_64-3.0.4/bin/mongo lapidus --eval 'db.getMongo().setSlaveOk();'
- ${PWD}/mongodb-linux-x86_64-3.0.4/bin/mongo lapidus --eval 'db.createUser({user:"lapidus",pwd:"2PQM9aiKMJX5chv76gYdFJNi",roles:[{role:"clusterAdmin",db:"admin"},{role:"readAnyDatabase",db:"admin"},"readWrite"]});'
- ${PWD}/mongodb-linux-x86_64-3.0.4/bin/mongo lapidus --eval 'db.createUser({user:"walt",pwd:"EnlmLSvK6XeneMKZOhXGEnNtvUgHtuFV2ZZi",roles:[{role:"clusterAdmin",db:"admin"},{role:"readAnyDatabase",db:"admin"},"readWrite"]});'
- sudo service mysql stop
Expand Down
20 changes: 18 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
[![Coverage Status](https://coveralls.io/repos/JarvusInnovations/lapidus/badge.svg?branch=master&service=github)](https://coveralls.io/github/JarvusInnovations/lapidus?branch=master)

# Getting Started
Currently MySQL and PostgreSQL databases are supported with MongoDB and Redis support on the way. Lapidus can be used
as a Node.js module or directly from the terminal.
Currently MySQL and PostgreSQL databases are fully supported. MongoDB supports inserts and deletes, however, updates
return a copy of the operation (for example, a `$rename` operation will return a `$set` for the new field and an
`$unset` for the old field) instead of the object as it exists in the database. Redis support is on the way. Lapidus can
currently be used as a daemon or Node.js module. Support for piping line-delimited JSON to other processes is a high
priority.

## PostgreSQL
You'll need PostgreSQL 9.4 or higher with logical replication configured and the decoding_json plugin installed and
Expand Down Expand Up @@ -68,6 +71,19 @@ service mysql restart
service mysql status
```

## MongoDB
We test against MongoDB 3.x, however, older versions should work. You'll need to setup MongoDB as a replica set. If
you're not truly using replication during development you will need to connect and run:

```javascript
// DO NOT DO THIS IN PRODUCTION
rs.initiate()
db.getMongo().setSlaveOk();
```

For more information on setting up replication in MongoDB
[check out the docs](http://docs.mongodb.org/manual/tutorial/deploy-replica-set/).

## Configuration
Lapidus will search for lapidus.json in the current working directory. You can specify a different configuration by
passing it to the constructor or using the ``-c`` flag on the terminal. For a list of command line options run
Expand Down
6 changes: 3 additions & 3 deletions src/mongo-worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ var MongoDb = require('./mongo.js'),
function init(config) {
'use strict';

console.log(`MongoDB: Connecting to ${config.database} on ${config.host} as ${config.user}`);
console.log(`MongoDB: Connecting to ${config.database} on ${config.hostname} as ${config.username}`);
}

process.on('message', function (msg) {
Expand All @@ -24,10 +24,9 @@ process.on('message', function (msg) {
var config = config.plugins[plugin];
plugin = require('./plugins/' + plugin);
plugins.push(plugin);
plugin.init(config, pg);
plugin.init(config, mongo);
}


mongo.on('error', function(err) {
console.error(err);
});
Expand All @@ -41,6 +40,7 @@ process.on('message', function (msg) {
if (err) {
throw err;
}
console.log(`MongoDB: Connected`);
});
});
}
Expand Down
207 changes: 173 additions & 34 deletions src/mongo.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,42 @@ const EventEmitter = require('events');
const MongoOplog = require('mongo-oplog');

function Mongo(options) {
var connString;

options = options || {};

// In V8 it's 8.7x slower to lookup an undefined property than to read a boolean value, so we'll explicitly values.
if (!options.connString) {
connString = 'mongodb://';
if (options.username && options.password) {
connString += options.username + ':' + options.password + '@';
}

connString += options.hostname || options.host || '127.0.0.1';

if (options.port) {
connString += ':' + options.port;
}

connString += '/' + (options.database || 'admin');

if (options.replicaSet) {
connString += '?replicaSet=' + options.replicaSet;
}

this.connString = connString;
}

for (var prop in options) {
this[prop] = options.prop;
}

Object.defineProperty(this, "_emitEvents", {
enumerable: false,
writable: true
});

// In V8 it's 8.7x slower to lookup an undefined property than to read a boolean value, so we'll explicitly values.

this._emitEvents = (typeof options.emitEvents === 'boolean') ? options.emitEvents : true;

this.emitInsert = (typeof options.emitInsert === 'boolean') ? options.emitInsert : this._emitEvents;
Expand Down Expand Up @@ -76,52 +103,164 @@ Object.defineProperty(Mongo.prototype, 'emitEvents', {
}
});

/*
var oplog = MongoOplog('mongodb://127.0.0.1:27017/lapidus');
Mongo.prototype.init = function init(cb) {
var oplog = MongoOplog('mongodb://127.0.0.1:27017/lapidus?replicaSet=rs0'),
self = this;

oplog.tail(function(err, stream) {
console.log(oplog.db);
});
oplog.on('error', function (error) {
self.emit('error', error);
});

oplog.on('op', function (data) {
console.log(data);
});
oplog.on('end', function () {
self.emit('end');
});

oplog.on('insert', function (doc) {
console.log(doc.op);
});
this.oplog = oplog;

oplog.on('update', function (doc) {
console.log(doc.op);
});
cb && cb(null, {});
};

oplog.on('delete', function (doc) {
console.log(doc.op._id);
});
Mongo.prototype.start = function(cb) {
var oplog = this.oplog,
self = this;

oplog.on('error', function (error) {
console.log(error);
});
if (!oplog) {
cb(new Error('You must call .init() before you call .start()'), null);
}

oplog.on('end', function () {
console.log('Stream ended');
});
oplog.on('insert', function (doc) {
var o = doc.o,
o2 = doc.o2,
ns = doc.ns;

var event = {
pk: o._id || o2._id,
ns: ns,
item: o
};

if (self.onInsert) {
if (self.onInsertWrapper) {
self.onInsertWrapper(function() {
self.onInsert(event, doc);
});
} else {
self.onInsert(event, doc);
}
}

if (self.onEvent) {
if (self.onEventWrapper) {
self.onEventWrapper(function() {
self.onEvent(event, doc);
});
} else {
self.onEvent(event, doc);
}
}

self.emitInsert && self.emit('insert', event);

if (self.emitEvent) {
event.type = 'insert';
self.emit('event', event);
}
});

oplog.stop(function () {
console.log('server stopped');
});
*/
oplog.on('update', function (doc) {
var o = doc.o,
o2 = doc.o2,
ns = doc.ns;

var event = {
pk: o._id || o2._id,
ns: ns,
item: o
};

if (self.onUpdate) {
if (self.onUpdateWrapper) {
self.onUpdateWrapper(function() {
self.onUpdate(event, doc);
});
} else {
self.onUpdate(event, doc);
}
}

if (self.onEvent) {
if (self.onEventWrapper) {
self.onEventWrapper(function() {
self.onEvent(event, doc);
});
} else {
self.onEvent(event, doc);
}
}

self.emitUpdate && self.emit('update', event);

if (self.emitEvent) {
event.type = 'update';
self.emit('event', event);
}
});

Mongo.prototype.init = function init(cb) {
cb && cb(null, {});
};
oplog.on('delete', function (doc) {
var o = doc.o,
o2 = doc.o2,
ns = doc.ns;

var event = {
pk: o._id || o2._id,
ns: ns
};

if (self.onDelete) {
if (self.onDeleteWrapper) {
self.onDeleteWrapper(function() {
self.onDelete(event, doc);
});
} else {
self.onDelete(event, doc);
}
}

if (self.onEvent) {
if (self.onEventWrapper) {
self.onEventWrapper(function() {
self.onEvent(event, doc);
});
} else {
self.onEvent(event, doc);
}
}

self.emitDelete && self.emit('delete', event);

if (self.emitEvent) {
event.type = 'delete';
self.emit('event', event);
}
});

Mongo.prototype.start = function(cb) {
cb && cb(null, {});
if (typeof cb === 'function') {
oplog.tail(cb);
} else {
oplog.tail();
}
};

Mongo.prototype.stop = function() {
Mongo.prototype.stop = function(cb) {
if (!this.oplog) {
cb(new Error('You must call .init() before you call .stop()'), null);
}

if (typeof cb === 'function') {
this.oplog.stop(cb);
} else {
this.oplog.stop();
}
};

Mongo.prototype.validateConfig = function() {
Expand Down
3 changes: 3 additions & 0 deletions src/plugins/nats.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@ module.exports = {
nats = nats.connect(config);

eventEmitter.on('event', function(event) {

if (event.schema && event.table && event.pk) {
nats.publish(event.schema + '.' + event.table + '.' + event.pk, JSON.stringify(event));
} else if (event.ns && event.pk) {
nats.publish(event.ns + '.' + event.pk, JSON.stringify(event));
}
});
},
Expand Down
4 changes: 4 additions & 0 deletions test/config/mongo-only.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
"username": "lapidus",
"database": "lapidus",
"password": "2PQM9aiKMJX5chv76gYdFJNi"
"password": "2PQM9aiKMJX5chv76gYdFJNi",
"replicaSet": "rs0"
},

{
Expand All @@ -14,6 +16,8 @@
"username": "walt",
"database": "lapidus",
"password": "EnlmLSvK6XeneMKZOhXGEnNtvUgHtuFV2ZZi"
"password": "EnlmLSvK6XeneMKZOhXGEnNtvUgHtuFV2ZZi",
"replicaSet": "rs0"
}
]
}
21 changes: 11 additions & 10 deletions test/mongo.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,24 @@ var assert = require('assert'),
path = require('path');

describe('MongoDB', function () {

before(function (done) {
output = spawnSync('node', ['index.js', '-c', './test/config/mongo-only.json'], {timeout: 1500});
done();
});

it('connects to valid MongoDB backend(s)', function () {
assert.equal(output.status, 0);
assert.equal(output.stderr.toString(), '');
});

describe('Can be used as a module', function () {
var mongo,
eventsWrapper;

before(function (done) {
var config = require(path.join(__dirname, './config/mongo-only.json')).backends[0];

// The slot will still be in use from the spawned process above
config.slot = config.slot + '1';

config.onEventsWrapper = setImmediate;

config.onEvent = function () {
console.log('onEvent');
console.log(arguments);
};

mongo = new MongoDb(config);

mongo.init(function(err) {
Expand Down

0 comments on commit 1147cda

Please sign in to comment.