Skip to content

Commit

Permalink
feat: introduce a new Monitor type for server monitoring
Browse files Browse the repository at this point in the history
The `Monitor` replaces the legacy `monitorServer` function and
completely manages the process of the server monitoring component
of SDAM.

NODE-2386
  • Loading branch information
mbroadst committed Dec 30, 2019
1 parent c15c359 commit 2bfe2a1
Show file tree
Hide file tree
Showing 2 changed files with 387 additions and 90 deletions.
293 changes: 207 additions & 86 deletions lib/core/sdam/monitor.js
Original file line number Diff line number Diff line change
@@ -1,126 +1,247 @@
'use strict';

const ServerDescription = require('./server_description').ServerDescription;
const ServerType = require('./common').ServerType;
const calculateDurationInMs = require('../utils').calculateDurationInMs;
const EventEmitter = require('events');
const connect = require('../connection/connect');
const Connection = require('../../cmap/connection').Connection;
const common = require('./common');
const makeStateMachine = require('../utils').makeStateMachine;
const MongoError = require('../error').MongoError;

const sdamEvents = require('./events');
const ServerHeartbeatStartedEvent = sdamEvents.ServerHeartbeatStartedEvent;
const ServerHeartbeatSucceededEvent = sdamEvents.ServerHeartbeatSucceededEvent;
const ServerHeartbeatFailedEvent = sdamEvents.ServerHeartbeatFailedEvent;

// pulled from `Server` implementation
const STATE_CLOSED = 'closed';
const STATE_CLOSING = 'closing';

/**
* Performs a server check as described by the SDAM spec.
*
* NOTE: This method automatically reschedules itself, so that there is always an active
* monitoring process
*
* @param {Server} server The server to monitor
*/
function monitorServer(server, options) {
options = options || {};
const heartbeatFrequencyMS = options.heartbeatFrequencyMS || 10000;

if (options.initial === true) {
server.s.monitorId = setTimeout(() => monitorServer(server), heartbeatFrequencyMS);
return;
const kServer = Symbol('server');
const kMonitorId = Symbol('monitorId');
const kConnection = Symbol('connection');
const kCancellationToken = Symbol('cancellationToken');
const kLastCheckTime = Symbol('lastCheckTime');

const STATE_CLOSED = common.STATE_CLOSED;
const STATE_CLOSING = common.STATE_CLOSING;
const STATE_IDLE = 'idle';
const STATE_MONITORING = 'monitoring';
const stateTransition = makeStateMachine({
[STATE_CLOSING]: [STATE_CLOSING, STATE_CLOSED],
[STATE_CLOSED]: [STATE_CLOSED, STATE_MONITORING],
[STATE_IDLE]: [STATE_IDLE, STATE_MONITORING, STATE_CLOSING],
[STATE_MONITORING]: [STATE_MONITORING, STATE_IDLE, STATE_CLOSING]
});

const INVALID_REQUEST_CHECK_STATES = new Set([STATE_CLOSING, STATE_CLOSED, STATE_MONITORING]);

class Monitor extends EventEmitter {
constructor(server, options) {
super(options);

this[kServer] = server;
this[kConnection] = undefined;
this[kCancellationToken] = new EventEmitter();
this[kCancellationToken].setMaxListeners(Infinity);
this.s = {
state: STATE_CLOSED
};

this.address = server.description.address;
this.options = Object.freeze({
connectTimeoutMS:
typeof options.connectionTimeout === 'number' ? options.connectionTimeout : 10000,
heartbeatFrequencyMS:
typeof options.heartbeatFrequencyMS === 'number' ? options.heartbeatFrequencyMS : 10000,
minHeartbeatFrequencyMS:
typeof options.minHeartbeatFrequencyMS === 'number' ? options.minHeartbeatFrequencyMS : 500
});

// TODO: refactor this to pull it directly from the pool, requires new ConnectionPool integration
const addressParts = server.description.address.split(':');
this.connectOptions = Object.freeze(
Object.assign(
{
host: addressParts[0],
port: parseInt(addressParts[1], 10),
bson: server.s.bson,
connectionType: Connection
},
server.s.options,

// force BSON serialization options
{
raw: false,
promoteLongs: true,
promoteValues: true,
promoteBuffers: true
}
)
);
}

const rescheduleMonitoring = () => {
server.s.monitoring = false;
server.s.monitorId = setTimeout(() => {
server.s.monitorId = undefined;
server.monitor();
}, heartbeatFrequencyMS);
};
connect() {
if (this.s.state !== STATE_CLOSED) {
return;
}

// executes a single check of a server
const checkServer = callback => {
let start = process.hrtime();
monitorServer(this);
}

// emit a signal indicating we have started the heartbeat
server.emit('serverHeartbeatStarted', new ServerHeartbeatStartedEvent(server.name));
requestCheck() {
if (INVALID_REQUEST_CHECK_STATES.has(this.s.state)) {
return;
}

// NOTE: legacy monitoring event
process.nextTick(() => server.emit('monitoring', server));
const heartbeatFrequencyMS = this.options.heartbeatFrequencyMS;
const minHeartbeatFrequencyMS = this.options.minHeartbeatFrequencyMS;
const remainingTime = heartbeatFrequencyMS - calculateDurationInMs(this[kLastCheckTime]);
if (remainingTime > minHeartbeatFrequencyMS && this[kMonitorId]) {
clearTimeout(this[kMonitorId]);
rescheduleMonitoring(this, minHeartbeatFrequencyMS);
return;
}

server.command(
'admin.$cmd',
{ ismaster: true },
{
monitoring: true,
socketTimeout: server.s.options.connectionTimeout || 2000
},
(err, result) => {
let duration = calculateDurationInMs(start);
if (this[kMonitorId]) {
clearTimeout(this[kMonitorId]);
}

if (err) {
server.emit(
'serverHeartbeatFailed',
new ServerHeartbeatFailedEvent(duration, err, server.name)
);
monitorServer(this);
}

return callback(err, null);
}
close() {
if (this.s.state === STATE_CLOSED || this.s.state === STATE_CLOSING) {
return;
}

// save round trip time
server.description.roundTripTime = duration;
stateTransition(this, STATE_CLOSING);
this[kCancellationToken].emit('cancel');
if (this[kMonitorId]) {
clearTimeout(this[kMonitorId]);
}

const isMaster = result.result;
server.emit(
'serverHeartbeatSucceeded',
new ServerHeartbeatSucceededEvent(duration, isMaster, server.name)
);
if (this[kConnection]) {
this[kConnection].destroy({ force: true });
}

return callback(null, isMaster);
this.emit('close');
stateTransition(this, STATE_CLOSED);
}
}

function checkServer(monitor, callback) {
if (monitor[kConnection] && monitor[kConnection].closed) {
monitor[kConnection] = undefined;
}

monitor.emit('serverHeartbeatStarted', new ServerHeartbeatStartedEvent(monitor.address));

if (monitor[kConnection] != null) {
const connectTimeoutMS = monitor.options.connectTimeoutMS;
monitor[kConnection].command(
'admin.$cmd',
{ ismaster: true },
{ socketTimeout: connectTimeoutMS },
(err, isMaster) => {
if (err) {
return callback(err);
}

return callback(undefined, isMaster);
}
);
};

const successHandler = isMaster => {
// emit an event indicating that our description has changed
server.emit('descriptionReceived', new ServerDescription(server.description.address, isMaster));
if (server.s.state === STATE_CLOSED || server.s.state === STATE_CLOSING) {
return;
}

// connecting does an implicit `ismaster`
connect(monitor.connectOptions, monitor[kCancellationToken], (err, conn) => {
if (err) {
monitor[kConnection] = undefined;
callback(err);
return;
}

if (monitor.s.state === STATE_CLOSING || monitor.s.state === STATE_CLOSED) {
conn.destroy({ force: true });
callback(new MongoError('monitor was destroyed'));
return;
}

rescheduleMonitoring();
};
monitor[kConnection] = conn;
callback(undefined, conn.description);
});
}

function monitorServer(monitor) {
const start = process.hrtime();
stateTransition(monitor, STATE_MONITORING);

// TODO: the next line is a legacy event, remove in v4
process.nextTick(() => monitor.emit('monitoring', monitor[kServer]));

checkServer(monitor, (err, isMaster) => {
if (isMaster) {
successHandler(monitor, start, isMaster);
return;
}

// run the actual monitoring loop
server.s.monitoring = true;
checkServer((err, isMaster) => {
if (!err) {
successHandler(isMaster);
// otherwise an error occured on initial discovery, also bail
if (monitor[kServer].description.type === ServerType.Unknown) {
failureHandler(monitor, start, err);
return;
}

// According to the SDAM specification's "Network error during server check" section, if
// an ismaster call fails we reset the server's pool. If a server was once connected,
// change its type to `Unknown` only after retrying once.
server.s.pool.reset(() => {
// otherwise re-attempt monitoring once
checkServer((error, isMaster) => {
if (error) {
// we revert to an `Unknown` by emitting a default description with no isMaster
server.emit(
'descriptionReceived',
new ServerDescription(server.description.address, null, { error })
);

rescheduleMonitoring();
return;
}
monitor.emit('resetConnectionPool');

checkServer(monitor, (error, isMaster) => {
if (error) {
// NOTE: using the _first_ error encountered here
failureHandler(monitor, start, err);
return;
}

successHandler(isMaster);
});
successHandler(monitor, start, isMaster);
});
});
}

function rescheduleMonitoring(monitor, ms) {
const heartbeatFrequencyMS = monitor.options.heartbeatFrequencyMS;
if (monitor.s.state === STATE_CLOSING || monitor.s.state === STATE_CLOSED) {
return;
}

monitor[kLastCheckTime] = process.hrtime();
monitor[kMonitorId] = setTimeout(() => {
monitor[kMonitorId] = undefined;
monitor.requestCheck();
}, ms || heartbeatFrequencyMS);

stateTransition(monitor, STATE_IDLE);
}

function successHandler(monitor, start, isMaster) {
process.nextTick(() =>
monitor.emit(
'serverHeartbeatSucceeded',
new ServerHeartbeatSucceededEvent(calculateDurationInMs(start), isMaster, monitor.address)
)
);

rescheduleMonitoring(monitor);
}

function failureHandler(monitor, start, err) {
monitor.emit(
'serverHeartbeatFailed',
new ServerHeartbeatFailedEvent(calculateDurationInMs(start), err, monitor.address)
);

rescheduleMonitoring(monitor);
}

module.exports = {
monitorServer
Monitor
};
Loading

0 comments on commit 2bfe2a1

Please sign in to comment.