-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: introduce a new
Monitor
type for server monitoring
The `Monitor` replaces the legacy `monitorServer` function and completely manages the process of the server monitoring component of SDAM. NODE-2386
- Loading branch information
Showing
2 changed files
with
387 additions
and
90 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,126 +1,247 @@ | ||
'use strict'; | ||
|
||
const ServerDescription = require('./server_description').ServerDescription; | ||
const ServerType = require('./common').ServerType; | ||
const calculateDurationInMs = require('../utils').calculateDurationInMs; | ||
const EventEmitter = require('events'); | ||
const connect = require('../connection/connect'); | ||
const Connection = require('../../cmap/connection').Connection; | ||
const common = require('./common'); | ||
const makeStateMachine = require('../utils').makeStateMachine; | ||
const MongoError = require('../error').MongoError; | ||
|
||
const sdamEvents = require('./events'); | ||
const ServerHeartbeatStartedEvent = sdamEvents.ServerHeartbeatStartedEvent; | ||
const ServerHeartbeatSucceededEvent = sdamEvents.ServerHeartbeatSucceededEvent; | ||
const ServerHeartbeatFailedEvent = sdamEvents.ServerHeartbeatFailedEvent; | ||
|
||
// pulled from `Server` implementation | ||
const STATE_CLOSED = 'closed'; | ||
const STATE_CLOSING = 'closing'; | ||
|
||
/** | ||
* Performs a server check as described by the SDAM spec. | ||
* | ||
* NOTE: This method automatically reschedules itself, so that there is always an active | ||
* monitoring process | ||
* | ||
* @param {Server} server The server to monitor | ||
*/ | ||
function monitorServer(server, options) { | ||
options = options || {}; | ||
const heartbeatFrequencyMS = options.heartbeatFrequencyMS || 10000; | ||
|
||
if (options.initial === true) { | ||
server.s.monitorId = setTimeout(() => monitorServer(server), heartbeatFrequencyMS); | ||
return; | ||
const kServer = Symbol('server'); | ||
const kMonitorId = Symbol('monitorId'); | ||
const kConnection = Symbol('connection'); | ||
const kCancellationToken = Symbol('cancellationToken'); | ||
const kLastCheckTime = Symbol('lastCheckTime'); | ||
|
||
const STATE_CLOSED = common.STATE_CLOSED; | ||
const STATE_CLOSING = common.STATE_CLOSING; | ||
const STATE_IDLE = 'idle'; | ||
const STATE_MONITORING = 'monitoring'; | ||
const stateTransition = makeStateMachine({ | ||
[STATE_CLOSING]: [STATE_CLOSING, STATE_CLOSED], | ||
[STATE_CLOSED]: [STATE_CLOSED, STATE_MONITORING], | ||
[STATE_IDLE]: [STATE_IDLE, STATE_MONITORING, STATE_CLOSING], | ||
[STATE_MONITORING]: [STATE_MONITORING, STATE_IDLE, STATE_CLOSING] | ||
}); | ||
|
||
const INVALID_REQUEST_CHECK_STATES = new Set([STATE_CLOSING, STATE_CLOSED, STATE_MONITORING]); | ||
|
||
class Monitor extends EventEmitter { | ||
constructor(server, options) { | ||
super(options); | ||
|
||
this[kServer] = server; | ||
this[kConnection] = undefined; | ||
this[kCancellationToken] = new EventEmitter(); | ||
this[kCancellationToken].setMaxListeners(Infinity); | ||
this.s = { | ||
state: STATE_CLOSED | ||
}; | ||
|
||
this.address = server.description.address; | ||
this.options = Object.freeze({ | ||
connectTimeoutMS: | ||
typeof options.connectionTimeout === 'number' ? options.connectionTimeout : 10000, | ||
heartbeatFrequencyMS: | ||
typeof options.heartbeatFrequencyMS === 'number' ? options.heartbeatFrequencyMS : 10000, | ||
minHeartbeatFrequencyMS: | ||
typeof options.minHeartbeatFrequencyMS === 'number' ? options.minHeartbeatFrequencyMS : 500 | ||
}); | ||
|
||
// TODO: refactor this to pull it directly from the pool, requires new ConnectionPool integration | ||
const addressParts = server.description.address.split(':'); | ||
this.connectOptions = Object.freeze( | ||
Object.assign( | ||
{ | ||
host: addressParts[0], | ||
port: parseInt(addressParts[1], 10), | ||
bson: server.s.bson, | ||
connectionType: Connection | ||
}, | ||
server.s.options, | ||
|
||
// force BSON serialization options | ||
{ | ||
raw: false, | ||
promoteLongs: true, | ||
promoteValues: true, | ||
promoteBuffers: true | ||
} | ||
) | ||
); | ||
} | ||
|
||
const rescheduleMonitoring = () => { | ||
server.s.monitoring = false; | ||
server.s.monitorId = setTimeout(() => { | ||
server.s.monitorId = undefined; | ||
server.monitor(); | ||
}, heartbeatFrequencyMS); | ||
}; | ||
connect() { | ||
if (this.s.state !== STATE_CLOSED) { | ||
return; | ||
} | ||
|
||
// executes a single check of a server | ||
const checkServer = callback => { | ||
let start = process.hrtime(); | ||
monitorServer(this); | ||
} | ||
|
||
// emit a signal indicating we have started the heartbeat | ||
server.emit('serverHeartbeatStarted', new ServerHeartbeatStartedEvent(server.name)); | ||
requestCheck() { | ||
if (INVALID_REQUEST_CHECK_STATES.has(this.s.state)) { | ||
return; | ||
} | ||
|
||
// NOTE: legacy monitoring event | ||
process.nextTick(() => server.emit('monitoring', server)); | ||
const heartbeatFrequencyMS = this.options.heartbeatFrequencyMS; | ||
const minHeartbeatFrequencyMS = this.options.minHeartbeatFrequencyMS; | ||
const remainingTime = heartbeatFrequencyMS - calculateDurationInMs(this[kLastCheckTime]); | ||
if (remainingTime > minHeartbeatFrequencyMS && this[kMonitorId]) { | ||
clearTimeout(this[kMonitorId]); | ||
rescheduleMonitoring(this, minHeartbeatFrequencyMS); | ||
return; | ||
} | ||
|
||
server.command( | ||
'admin.$cmd', | ||
{ ismaster: true }, | ||
{ | ||
monitoring: true, | ||
socketTimeout: server.s.options.connectionTimeout || 2000 | ||
}, | ||
(err, result) => { | ||
let duration = calculateDurationInMs(start); | ||
if (this[kMonitorId]) { | ||
clearTimeout(this[kMonitorId]); | ||
} | ||
|
||
if (err) { | ||
server.emit( | ||
'serverHeartbeatFailed', | ||
new ServerHeartbeatFailedEvent(duration, err, server.name) | ||
); | ||
monitorServer(this); | ||
} | ||
|
||
return callback(err, null); | ||
} | ||
close() { | ||
if (this.s.state === STATE_CLOSED || this.s.state === STATE_CLOSING) { | ||
return; | ||
} | ||
|
||
// save round trip time | ||
server.description.roundTripTime = duration; | ||
stateTransition(this, STATE_CLOSING); | ||
this[kCancellationToken].emit('cancel'); | ||
if (this[kMonitorId]) { | ||
clearTimeout(this[kMonitorId]); | ||
} | ||
|
||
const isMaster = result.result; | ||
server.emit( | ||
'serverHeartbeatSucceeded', | ||
new ServerHeartbeatSucceededEvent(duration, isMaster, server.name) | ||
); | ||
if (this[kConnection]) { | ||
this[kConnection].destroy({ force: true }); | ||
} | ||
|
||
return callback(null, isMaster); | ||
this.emit('close'); | ||
stateTransition(this, STATE_CLOSED); | ||
} | ||
} | ||
|
||
function checkServer(monitor, callback) { | ||
if (monitor[kConnection] && monitor[kConnection].closed) { | ||
monitor[kConnection] = undefined; | ||
} | ||
|
||
monitor.emit('serverHeartbeatStarted', new ServerHeartbeatStartedEvent(monitor.address)); | ||
|
||
if (monitor[kConnection] != null) { | ||
const connectTimeoutMS = monitor.options.connectTimeoutMS; | ||
monitor[kConnection].command( | ||
'admin.$cmd', | ||
{ ismaster: true }, | ||
{ socketTimeout: connectTimeoutMS }, | ||
(err, isMaster) => { | ||
if (err) { | ||
return callback(err); | ||
} | ||
|
||
return callback(undefined, isMaster); | ||
} | ||
); | ||
}; | ||
|
||
const successHandler = isMaster => { | ||
// emit an event indicating that our description has changed | ||
server.emit('descriptionReceived', new ServerDescription(server.description.address, isMaster)); | ||
if (server.s.state === STATE_CLOSED || server.s.state === STATE_CLOSING) { | ||
return; | ||
} | ||
|
||
// connecting does an implicit `ismaster` | ||
connect(monitor.connectOptions, monitor[kCancellationToken], (err, conn) => { | ||
if (err) { | ||
monitor[kConnection] = undefined; | ||
callback(err); | ||
return; | ||
} | ||
|
||
if (monitor.s.state === STATE_CLOSING || monitor.s.state === STATE_CLOSED) { | ||
conn.destroy({ force: true }); | ||
callback(new MongoError('monitor was destroyed')); | ||
return; | ||
} | ||
|
||
rescheduleMonitoring(); | ||
}; | ||
monitor[kConnection] = conn; | ||
callback(undefined, conn.description); | ||
}); | ||
} | ||
|
||
function monitorServer(monitor) { | ||
const start = process.hrtime(); | ||
stateTransition(monitor, STATE_MONITORING); | ||
|
||
// TODO: the next line is a legacy event, remove in v4 | ||
process.nextTick(() => monitor.emit('monitoring', monitor[kServer])); | ||
|
||
checkServer(monitor, (err, isMaster) => { | ||
if (isMaster) { | ||
successHandler(monitor, start, isMaster); | ||
return; | ||
} | ||
|
||
// run the actual monitoring loop | ||
server.s.monitoring = true; | ||
checkServer((err, isMaster) => { | ||
if (!err) { | ||
successHandler(isMaster); | ||
// otherwise an error occured on initial discovery, also bail | ||
if (monitor[kServer].description.type === ServerType.Unknown) { | ||
failureHandler(monitor, start, err); | ||
return; | ||
} | ||
|
||
// According to the SDAM specification's "Network error during server check" section, if | ||
// an ismaster call fails we reset the server's pool. If a server was once connected, | ||
// change its type to `Unknown` only after retrying once. | ||
server.s.pool.reset(() => { | ||
// otherwise re-attempt monitoring once | ||
checkServer((error, isMaster) => { | ||
if (error) { | ||
// we revert to an `Unknown` by emitting a default description with no isMaster | ||
server.emit( | ||
'descriptionReceived', | ||
new ServerDescription(server.description.address, null, { error }) | ||
); | ||
|
||
rescheduleMonitoring(); | ||
return; | ||
} | ||
monitor.emit('resetConnectionPool'); | ||
|
||
checkServer(monitor, (error, isMaster) => { | ||
if (error) { | ||
// NOTE: using the _first_ error encountered here | ||
failureHandler(monitor, start, err); | ||
return; | ||
} | ||
|
||
successHandler(isMaster); | ||
}); | ||
successHandler(monitor, start, isMaster); | ||
}); | ||
}); | ||
} | ||
|
||
function rescheduleMonitoring(monitor, ms) { | ||
const heartbeatFrequencyMS = monitor.options.heartbeatFrequencyMS; | ||
if (monitor.s.state === STATE_CLOSING || monitor.s.state === STATE_CLOSED) { | ||
return; | ||
} | ||
|
||
monitor[kLastCheckTime] = process.hrtime(); | ||
monitor[kMonitorId] = setTimeout(() => { | ||
monitor[kMonitorId] = undefined; | ||
monitor.requestCheck(); | ||
}, ms || heartbeatFrequencyMS); | ||
|
||
stateTransition(monitor, STATE_IDLE); | ||
} | ||
|
||
function successHandler(monitor, start, isMaster) { | ||
process.nextTick(() => | ||
monitor.emit( | ||
'serverHeartbeatSucceeded', | ||
new ServerHeartbeatSucceededEvent(calculateDurationInMs(start), isMaster, monitor.address) | ||
) | ||
); | ||
|
||
rescheduleMonitoring(monitor); | ||
} | ||
|
||
function failureHandler(monitor, start, err) { | ||
monitor.emit( | ||
'serverHeartbeatFailed', | ||
new ServerHeartbeatFailedEvent(calculateDurationInMs(start), err, monitor.address) | ||
); | ||
|
||
rescheduleMonitoring(monitor); | ||
} | ||
|
||
module.exports = { | ||
monitorServer | ||
Monitor | ||
}; |
Oops, something went wrong.