Skip to content

Commit

Permalink
fix(app-service): fix change stream reconnection while mongo connecti…
Browse files Browse the repository at this point in the history
…on losed
  • Loading branch information
maslow committed Dec 27, 2021
1 parent 66e9d7a commit fd8fcd5
Showing 1 changed file with 19 additions and 5 deletions.
24 changes: 19 additions & 5 deletions packages/app-service/src/lib/scheduler/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* @Author: Maslow<wangfugen@126.com>
* @Date: 2021-07-30 10:30:29
* @LastEditTime: 2021-11-17 14:11:16
* @LastEditTime: 2021-12-27 18:35:09
* @Description:
*/

Expand All @@ -26,6 +26,7 @@ export const SchedulerInstance = new FrameworkScheduler()
* Initialize scheduler while db connection is ready
*/
accessor.ready.then(async () => {
logger.error('db connection', 'hahaha')
// initialize triggers
const triggers = await getTriggers()
logger.debug('loadTriggers: ', triggers)
Expand All @@ -38,10 +39,23 @@ accessor.ready.then(async () => {

// watch database operation event through `WatchStream` of mongodb
const db = accessor.db
const stream = db.watch([], { fullDocument: 'updateLookup' })
stream.on("change", (doc) => { DatabaseChangeEventCallBack(doc) })
process.on('SIGINT', () => stream.close())
process.on('SIGTERM', () => stream.close())

function startWatchChangeStream() {
logger.info('start watching change stream')
const stream = db.watch([], { fullDocument: 'updateLookup' })
stream.on("change", (doc) => { DatabaseChangeEventCallBack(doc) })
stream.on('error', (err) => {
logger.error('stream watch error: ', err)
setTimeout(() => {
logger.info('restart watching change stream...')
startWatchChangeStream()
}, 5000)
})
process.on('SIGINT', () => stream.close())
process.on('SIGTERM', () => stream.close())
}

startWatchChangeStream()

// emit `App:ready` event
SchedulerInstance.emit('App:ready')
Expand Down

0 comments on commit fd8fcd5

Please sign in to comment.