-
-
Notifications
You must be signed in to change notification settings - Fork 671
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: 用 mongodb watch 实现数据监听机制;实现访问策略部署后自动应用;
- Loading branch information
Showing
3 changed files
with
148 additions
and
90 deletions.
There are no files selected for viewing
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
|
||
import { getTriggers } from "../../api/trigger" | ||
import { Trigger } from "cloud-function-engine" | ||
import { Globals } from "../globals" | ||
import { createLogger } from "../logger" | ||
import { convertActionType } from "../utils" | ||
import { ChangeStreamDocument } from "mongodb" | ||
import { FrameworkScheduler } from "./scheduler" | ||
import { debounce } from 'lodash' | ||
import { applyRules } from "../../api/rules" | ||
import { Constants } from "../../constants" | ||
|
||
|
||
const accessor = Globals.accessor | ||
const logger = createLogger('scheduler') | ||
|
||
/** | ||
* 触发器的调度器单例 | ||
*/ | ||
export const Scheduler = new FrameworkScheduler() | ||
|
||
/** | ||
* 当数据库连接成功时,初始化 scheduler | ||
*/ | ||
accessor.ready.then(async () => { | ||
// 初始化触发器 | ||
const data = await getTriggers() | ||
logger.debug('loadTriggers: ', data) | ||
const triggers = data.map(data => Trigger.fromJson(data)) | ||
Scheduler.init(triggers) | ||
Scheduler.emit('App:ready') | ||
|
||
// 监听数据操作事件 | ||
const db = accessor.db | ||
const stream = db.watch([], { fullDocument: 'updateLookup' }) | ||
stream.on("change", (doc) => { | ||
DatabaseChangeEventCallBack(doc) | ||
}) | ||
|
||
}) | ||
|
||
// 对应用访问策略的函数进行防抖动处理 | ||
const debouncedApplyPolicy = debounce(() => { | ||
applyRules() | ||
.then(() => logger.info('policy rules applied')) | ||
.catch(err => logger.error('policy rules applied failed: ', err)) | ||
}, 2000, { trailing: true, leading: false }) | ||
|
||
/** | ||
* 数据操作事件回调 | ||
* @param doc | ||
*/ | ||
function DatabaseChangeEventCallBack(doc: ChangeStreamDocument) { | ||
const operationType = doc.operationType | ||
const collection = doc.ns.coll | ||
|
||
// 访问策略变更时,加载新的访问规则 | ||
if (collection === Constants.policy_collection && operationType === 'insert') { | ||
debouncedApplyPolicy() | ||
} | ||
|
||
// 触发数据变更事件 | ||
const event = `DatabaseChange:${collection}#${operationType}` | ||
Scheduler.emit(event, doc) | ||
} | ||
|
||
|
||
/** | ||
* @deprecated 未来会去除此实现机制,采取 mongodb watch 机制代替 | ||
* @tip 暂保留此部分代码,以兼容老应用 | ||
* | ||
* 以下为原数据事件实现机制:监听 Accessor 的数据操作事件 | ||
* Accessor Event 机制实现有以下特点(相对于 mongodb watch 实现的机制): | ||
* - 不局限于 Mongodb 数据库 | ||
* - 除更新与删除事件外,可监听到 read 和 count 事件 | ||
* - 不能监听到不是通过 Accessor 操作的数据变化 | ||
* - update remove 操作不能获取受影响数据的标识(_id) | ||
* - 在本服务因意外中止运行期间发生的数据变化无法监听到(而 mongodb watch 机制可以做到从中断中恢复期间的变化监听) | ||
*/ | ||
accessor.on('result', AccessorEventCallBack) | ||
|
||
/** | ||
* 数据操作事件回调 | ||
* @param data | ||
*/ | ||
export function AccessorEventCallBack(data: any) { | ||
// 解决 mongodb _id 对象字符串问题 | ||
const _data = JSON.parse(JSON.stringify(data)) | ||
|
||
const { params, result } = _data | ||
|
||
const op = convertActionType(params.action) | ||
|
||
// 忽略的数据事件 | ||
if (['read', 'count', 'watch'].includes(op)) { | ||
return | ||
} | ||
|
||
// 触发数据事件 | ||
const event = `/db/${params.collection}#${op}` | ||
Scheduler.emit(event, { | ||
exec_params: params, | ||
exec_result: result | ||
}) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
|
||
import { getFunctionById } from "../../api/function" | ||
import { addFunctionLog } from "../../api/function-log" | ||
import { CloudFunction, TriggerScheduler } from "cloud-function-engine" | ||
import { createLogger } from "../logger" | ||
|
||
|
||
const logger = createLogger('scheduler') | ||
|
||
/** | ||
* 派生类,实现其获取云函数数据的方法 | ||
*/ | ||
export class FrameworkScheduler extends TriggerScheduler { | ||
|
||
/** | ||
* 加载云函数,派生类需要实现此方法 | ||
* @override | ||
* @param func_id | ||
* @returns | ||
*/ | ||
async getFunctionById(func_id: string): Promise<CloudFunction>{ | ||
const funcData = await getFunctionById(func_id) | ||
return new CloudFunction(funcData) | ||
} | ||
|
||
/** | ||
* 该方法父类会调用,重写以记录函数执行日志 | ||
* @override | ||
* @param data | ||
*/ | ||
async addFunctionLog(data: any) { | ||
await addFunctionLog(data) | ||
} | ||
|
||
/** | ||
* 重写以处理调试日志 | ||
* @override | ||
* @param params | ||
*/ | ||
async log(...params: any[]) { | ||
logger.debug(...params) | ||
} | ||
} |