Skip to content

Commit

Permalink
feat(db-ql): add aggregate() feat & tests;
Browse files Browse the repository at this point in the history
  • Loading branch information
maslow committed Oct 18, 2021
1 parent 2e7ac00 commit efce38a
Show file tree
Hide file tree
Showing 11 changed files with 284 additions and 36 deletions.
2 changes: 1 addition & 1 deletion packages/database-ql/src/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ tnpm run tstest

## 整体设计

- 使用`document.get()`获取数据时,把`where()``orderBy()``limit()``offser()`、设置的数据拼接到请求里。
- 使用`document.get()`获取数据时,把`where()``orderBy()``limit()``offset()`、设置的数据拼接到请求里。
- 对后台返回的数据进行格式化,使其成为一个`DocumentSnapshot`对象,对特殊类型的字段,如地理位置、日期时间进行处理。
- 使用`document.set()``document.update()`时,把数据进行编码,尤其是特殊字段的处理,编码成后端接口的数据格式。

Expand Down
82 changes: 64 additions & 18 deletions packages/database-ql/src/aggregate.ts
Original file line number Diff line number Diff line change
@@ -1,41 +1,69 @@
import { ActionType } from './constant'
import { Db } from './index'
import { RequestInterface } from './interface'
import { GetRes } from './result-types'
// import { EJSON } from 'bson'
import { EJSON } from 'bson'
import { QuerySerializer } from './serializer/query'
import { stringifyByEJSON } from './utils/utils'
import { getType } from './utils/type'
import { Validate } from './validate'
import { AggregateStage, RequestInterface } from './interface'
import { ActionType } from './constant'
import { GetRes } from './result-types'

const EARTH_RADIUS = 6378100

export default class Aggregation {
_db: Db
_request: RequestInterface
_stages: any[]
_stages: AggregateStage[]
_collectionName: string
constructor(db?: Db, collectionName?: string) {

constructor(db?: Db, collectionName?: string, rawPipeline?: any[]) {
this._stages = []

if (db && collectionName) {
this._db = db
this._request = this._db.request
this._collectionName = collectionName

if (rawPipeline && rawPipeline.length > 0) {
rawPipeline.forEach((stage) => {
Validate.isValidAggregation(stage)
const stageName = Object.keys(stage)[0]
this._pipe(stageName, stage[stageName], true)
})
}
}
}

async end<T = any>(): Promise<GetRes<T>> {
async end<T = any>() {
if (!this._collectionName || !this._db) {
throw new Error('Aggregation pipeline cannot send request')
}
const result = await this._request.send(ActionType.aggregate, {

if (!this._stages?.length) {
throw new Error('Aggregation stage cannot be empty')
}

const res = await this._request.send(ActionType.aggregate, {
collectionName: this._collectionName,
// stages: this._stages
stages: this._stages
})
if (result && result.data && result.data.list) {

if (res.error) {
return {
code: result.code,
ok: result.error ? false: true,
requestId: result.requestId,
// data: JSON.parse(result.data.list).map(EJSON.parse)
data: result.data.list
error: res.error,
data: res.data,
requestId: res.requestId,
ok: false,
code: res.code
}
}

const documents = res.data.list.map(EJSON.parse)
const result: GetRes<T> = {
data: documents,
requestId: res.requestId,
ok: true
}
return result
}

Expand All @@ -51,10 +79,18 @@ export default class Aggregation {
})
}

_pipe(stage, param) {
_pipe(stage: string, param: any, raw = false) {
// 区分param是否为字符串
let transformParam = ''
if (getType(param) === 'object') {
transformParam = stringifyByEJSON(param)
} else {
transformParam = JSON.stringify(param)
}

this._stages.push({
stageKey: `$${stage}`,
stageValue: JSON.stringify(param)
stageKey: raw ? stage : `$${stage}`,
stageValue: transformParam
})
return this
}
Expand All @@ -76,6 +112,16 @@ export default class Aggregation {
}

geoNear(param) {
if (param.query) {
param.query = QuerySerializer.encode(param.query)
}

// 判断是否有 distanceMultiplier 参数
if (param.distanceMultiplier && typeof (param.distanceMultiplier) === 'number') {
param.distanceMultiplier = param.distanceMultiplier * EARTH_RADIUS
} else {
param.distanceMultiplier = EARTH_RADIUS
}
return this._pipe('geoNear', param)
}

Expand Down
4 changes: 2 additions & 2 deletions packages/database-ql/src/collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ export class CollectionReference extends Query {
return docRef.create(data, options)
}

aggregate() {
return new Aggregation(this._db, this._coll)
aggregate(rawPipeline: object[] = []) {
return new Aggregation(this._db, this._coll, rawPipeline)
}
}
2 changes: 1 addition & 1 deletion packages/database-ql/src/constant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ enum ActionType {
update = 'database.updateDocument',
count = 'database.countDocument',
remove = 'database.deleteDocument',
aggregate = 'database.aggregate'
aggregate = 'database.aggregateDocuments'
}

export {
Expand Down
15 changes: 15 additions & 0 deletions packages/database-ql/src/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,18 @@ export interface ProjectionType {
[field: string]: 0 | 1
}

export interface AggregateStage {
stageKey: string,
stageValue: any
}

export interface QueryParam {

collectionName: string

/**
* Query options
*/
query?: Object
order?: QueryOrder[]
offset?: number
Expand All @@ -36,4 +46,9 @@ export interface QueryParam {
merge?: boolean
upsert?: boolean
data?: any

/**
* Aggregate stages
*/
stages?: AggregateStage[]
}
33 changes: 33 additions & 0 deletions packages/database-ql/src/utils/utils.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import { EJSON } from 'bson'
import { isObject } from './type'

export const sleep = (ms: number = 0) => new Promise(r => setTimeout(r, ms))

const counters: Record<string, number> = {}
Expand All @@ -8,3 +11,33 @@ export const autoCount = (domain: string = 'any'): number => {
}
return counters[domain]++
}


// 递归过滤对象中的undefiend字段
export const filterUndefined = o => {
// 如果不是对象类型,直接返回
if (!isObject(o)) {
return o
}

for (let key in o) {
if (o[key] === undefined) {
delete o[key]
} else if (isObject(o[key])) {
o[key] = filterUndefined(o[key])
}
}

return o
}

export const stringifyByEJSON = params => {
// params中删除undefined的key
params = filterUndefined(params)

return EJSON.stringify(params, { relaxed: false })
}

export const parseByEJSON = params => {
return EJSON.parse(params)
}
62 changes: 50 additions & 12 deletions packages/database-ql/src/validate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
FieldType
} from './constant'
import { Util } from './util'
import { getType } from './utils/type'

/**
* 校验模块
Expand All @@ -15,6 +16,19 @@ import { Util } from './util'
* @internal
*/
export class Validate {
/**
*
* @static
* @param {StageName:{}|string} stage
* @returns {Boolean}
* @memberof Validate
*/
static isValidAggregation(stage: object): Boolean {
if (Object.keys(stage).length !== 1) {
throw new Error('aggregation stage must have one key')
}
return true
}

/**
* 检测地址位置的点
Expand Down Expand Up @@ -51,6 +65,42 @@ export class Validate {
return true
}


static isProjection(param: string, value: object): Boolean {
// 遍历value 的 属性值, 只有1,0,ProjectionOperator 三种类型
if (getType(value) !== 'object') {
throw new Error(`${param} projection must be an object`)
}

for (const key in value) {
const subValue = value[key]
if (getType(subValue) === 'number') {
if (subValue !== 0 && subValue !== 1) {
throw new Error('if the value in projection is of number, it must be 0 or 1')
}
} else if (getType(subValue) === 'object') {
} else {
throw new Error('invalid projection')
}
}

return true
}

static isOrder(param: string, value: Record<string, any>): Boolean {
if (getType(value) !== 'object') {
throw new Error(`${param} order must be an object`)
}

for (let key in value) {
const subValue = value[key]
if (subValue !== 1 && subValue !== -1) {
throw new Error(`order value must be 1 or -1`)
}
}
return true
}

/**
* 是否为合法的排序字符
*
Expand Down Expand Up @@ -104,16 +154,4 @@ export class Validate {
}
return true
}

/**
* DocID 格式是否正确
*
* @param docId
*/
static isDocID(docId: string): Boolean {
if (!/^([a-fA-F0-9]){24}$/.test(docId)) {
throw new Error(ErrorCode.DocIDError)
}
return true
}
}
5 changes: 3 additions & 2 deletions packages/database-ql/tests/units/_utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ const Actions = {
get: 'database.queryDocument',
update: 'database.updateDocument',
count: 'database.countDocument',
remove: 'database.deleteDocument'
remove: 'database.deleteDocument',
aggregate: 'database.aggregateDocuments'
}

class MockRequest {
Expand All @@ -27,7 +28,7 @@ class MockRequest {
data = { _id: '0', insertedCount: 0 }
}

if (action === Actions.get) {
if (action === Actions.get || action === Actions.aggregate) {
data = { list: [] }
}

Expand Down
45 changes: 45 additions & 0 deletions packages/database-ql/tests/units/aggregate/index.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@

const { Actions, getDb } = require('../_utils')
const assert = require('assert')
const { ObjectId } = require('bson')

describe('db::aggregate()', () => {
it('aggregate() with raw pipeline should be ok', async () => {
const { db, req } = getDb()
const res = await db.collection('tasks')
.aggregate([
{
$match: {
name: 'laf'
}
},
{
$lookup: {
from: 'users',
localField: 'uid',
foreignField: '_id',
as: 'user'
}
}
])
.end()

console.log(res, req.params)
assert.strictEqual(req.action, Actions.aggregate)
assert.strictEqual(req.params.collectionName, 'tasks')
assert.strictEqual(req.params.stages.length, 2)

assert.ok(!res.code)
assert.ok(res.data instanceof Array)
})

it('aggregate() with empty should be rejected', async () => {
const { db, req } = getDb()
await db.collection('tasks')
.aggregate([])
.end()
.catch(err => {
assert.equal(err.toString(),'Error: Aggregation stage cannot be empty' )
})
})
})
Loading

0 comments on commit efce38a

Please sign in to comment.