Skip to content

Commit

Permalink
feat(websocket): support websocket in app-service;
Browse files Browse the repository at this point in the history
  • Loading branch information
maslow committed Nov 9, 2021
1 parent 1c12af5 commit 025dd39
Show file tree
Hide file tree
Showing 10 changed files with 360 additions and 13 deletions.
48 changes: 47 additions & 1 deletion packages/app-service/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion packages/app-service/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@
"multer": "^1.4.2",
"node-modules-utils": "^0.6.2",
"nodemailer": "^6.6.3",
"validator": "^12.2.0"
"validator": "^12.2.0",
"ws": "^8.2.3"
},
"devDependencies": {
"@types/dotenv": "^8.2.0",
Expand All @@ -53,6 +54,7 @@
"@types/node": "^16.7.10",
"@types/nodemailer": "^6.4.4",
"@types/validator": "^13.1.3",
"@types/ws": "^8.2.0",
"typescript": "^4.2.3"
},
"nodemonConfig": {
Expand Down
10 changes: 9 additions & 1 deletion packages/app-service/src/cloud-sdk/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import { getToken, parseToken } from "../lib/utils/token"
import { invokeInFunction } from "./invoke"
import { createFileStorage } from "../lib/storage"
import { CloudFunction } from "../lib/function"
import { WebSocket } from "ws"
import { WebSocketAgent } from "../lib/ws"


export type InvokeFunctionType = (name: string, param: FunctionContext) => Promise<any>
Expand Down Expand Up @@ -90,6 +92,11 @@ export interface CloudSdkInterface {
* 3. 聚合操作
*/
mongo: MongoDriverObject

/**
* WebSocket 连接例表
*/
sockets: Set<WebSocket>
}


Expand Down Expand Up @@ -123,7 +130,8 @@ export function create() {
mongo: {
client: DatabaseAgent.accessor.conn,
db: DatabaseAgent.accessor.db
}
},
sockets: WebSocketAgent.clients
}
return cloud
}
Expand Down
27 changes: 19 additions & 8 deletions packages/app-service/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* @Author: Maslow<wangfugen@126.com>
* @Date: 2021-07-30 10:30:29
* @LastEditTime: 2021-11-03 16:28:03
* @LastEditTime: 2021-11-09 19:09:30
* @Description:
*/

Expand All @@ -12,6 +12,7 @@ import { router } from './router/index'
import { logger } from './lib/logger'
import { generateUUID } from './lib/utils/rand'
import { initCloudSdkPackage } from './lib/utils/init'
import { WebSocketAgent } from './lib/ws'

initCloudSdkPackage()

Expand All @@ -20,9 +21,9 @@ initCloudSdkPackage()
*/
export * from './cloud-sdk'

const server = express()
server.use(express.json() as any)
server.use(express.urlencoded({
const app = express()
app.use(express.json() as any)
app.use(express.urlencoded({
extended: true
}) as any)

Expand All @@ -37,7 +38,7 @@ process.on('uncaughtException', err => {
/**
* Allow CORS by default
*/
server.all('*', function (_req, res, next) {
app.all('*', function (_req, res, next) {
res.header('Access-Control-Allow-Origin', '*')
res.header('Access-Control-Allow-Headers', 'Authorization, Content-Type')
res.header('Access-Control-Allow-Methods', '*')
Expand All @@ -48,7 +49,7 @@ server.all('*', function (_req, res, next) {
/**
* Parsing bearer token
*/
server.use(function (req, res, next) {
app.use(function (req, res, next) {
const token = splitBearerToken(req.headers['authorization'] ?? '')
const auth = parseToken(token) || null
req['auth'] = auth
Expand All @@ -60,6 +61,16 @@ server.use(function (req, res, next) {
next()
})

server.use(router)
app.use(router)

const server = app.listen(Config.PORT, () => logger.info(`server ${process.pid} listened on ${Config.PORT}`))

/**
* WebSocket upgrade & connect
*/
server.on('upgrade', (req, socket, head) => {
WebSocketAgent.server.handleUpgrade(req, socket as any, head, (client) => {
WebSocketAgent.server.emit('connection', client, req)
})
})

server.listen(Config.PORT, () => logger.info(`server ${process.pid} listened on ${Config.PORT}`))
32 changes: 30 additions & 2 deletions packages/app-service/src/lib/scheduler/scheduler.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
/*
* @Author: Maslow<wangfugen@126.com>
* @Date: 2021-07-30 10:30:29
* @LastEditTime: 2021-11-05 14:46:49
* @LastEditTime: 2021-11-09 20:02:51
* @Description:
*/

import { getFunctionById } from "../../api/function"
import { addFunctionLog, CloudFunctionLogStruct } from "../../api/function-log"
import { CloudFunction, TriggerScheduler } from "cloud-function-engine"
import { TriggerScheduler } from "cloud-function-engine"
import { createLogger } from "../logger"
import assert = require("assert")
import { ObjectId } from "bson"
import { WebSocket } from "ws"
import { IncomingMessage } from "http"
import { CloudFunction } from "../function"


const logger = createLogger('scheduler')
Expand All @@ -37,6 +40,31 @@ export class FrameworkScheduler extends TriggerScheduler {
return func
}


/**
* Trigger an websocket event
* @param event the event name
* @param data the params for function
*/
public websocketEmit(event: string, data: any, socket: WebSocket, request?: IncomingMessage) {

// filter triggers by given eventName
const triggers = this.getEventTriggers()
.filter(tri => tri.event === event)

// trigger the functions' execution
for (const tri of triggers) {
const param: any = {
params: data,
method: event,
requestId: `trigger_${tri.id}`,
socket,
headers: request?.headers
}
this.executeFunction(tri.func_id, param, tri)
}
}

/**
* Will be called by TriggerScheduler
* @override
Expand Down
75 changes: 75 additions & 0 deletions packages/app-service/src/lib/ws.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import { IncomingMessage } from 'http'
import { RawData, WebSocket, WebSocketServer } from 'ws'
import { logger } from './logger'
import { SchedulerInstance } from './scheduler'


export class WebSocketAgent {
private static _server = null

static get server(): WebSocketServer {
if (!this._server) {
this._server = new WebSocketServer({ noServer: true })
this.server.on('connection', handleSocketConnection)
this.server.on('error', error => logger.error('websocket server got error:', error))
}

return this._server
}

static get clients() {
return this.server.clients
}
}

/**
* Handle socket connection
* @param socket
* @param request
*/
function handleSocketConnection(socket: WebSocket, request: IncomingMessage) {
logger.debug(`socket connected`, request.headers)

socket.on('message', (data, isBinary) => {
handleSocketMessage(socket, data, isBinary)
})

socket.on('error', err => handleSocketError(socket, err))
socket.on('close', (code, reason) => handleSocketClose(socket, code, reason))

SchedulerInstance.websocketEmit('WebSocket:connection', null, socket, request)
}

/**
* Handle socket message
* @param _socket
* @param _data
* @param _isBinary
*/
async function handleSocketMessage(socket: WebSocket, data: RawData, isBinary: boolean) {
const param = { data, isBinary }
SchedulerInstance.websocketEmit('WebSocket:message', param, socket)
}

/**
* Handle socket close
* @param _socket
* @param _code
* @param _reason
*/
function handleSocketClose(socket: WebSocket, code: number, reason: Buffer) {
const param = { code, reason }
SchedulerInstance.websocketEmit('WebSocket:close', param, socket)
}

/**
* Handle socket error
* @param _socket
* @param error
*/
function handleSocketError(socket: WebSocket, error: Error) {
logger.error('websocket got err', error)

const param = error
SchedulerInstance.websocketEmit('WebSocket:error', param, socket)
}
21 changes: 21 additions & 0 deletions packages/system-client/nginx.conf
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,30 @@ server {
set $service_id app_$appid;
proxy_pass http://$service_id:8000;
add_header appid $appid;
# proxy_set_header Host $host:$server_port;
# proxy_set_header Host $host;
# proxy_http_version 1.1;
# proxy_set_header Upgrade $http_upgrade;
# proxy_set_header Connection "upgrade";
# proxy_read_timeout 6000s;
}
}

location /socket {
resolver 127.0.0.11;
if ($host ~* "(\w{8}(-\w{4}){3}-\w{12})\.(.+)$") {
set $appid $1;
set $service_id app_$appid;
proxy_pass http://$service_id:8000/;
# add_header appid $appid;
}
proxy_set_header Host $host;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_read_timeout 6000s;
}

location /deploy/incoming {
if ($host ~* "(\w{8}(-\w{4}){3}-\w{12})\.(.+)$") {
set $appid $1;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import { response_type } from './response_type'
import { websocket_type } from './websocket_type'

export const global_declare = `
${response_type}
${websocket_type}
declare class FunctionConsole {
private _logs;
Expand Down Expand Up @@ -71,6 +73,11 @@ interface FunctionContext {
* Express Response 对象
*/
response: HttpResponse
/**
* WebSocket 对象
*/
socket?: WebSocket
}
interface IModule {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ export class AutoImportTypings {
if (!this.isLoaded('axios')) { this.loadDeclaration('axios') }
if (!this.isLoaded('cloud-function-engine')) { this.loadDeclaration('cloud-function-engine') }
if (!this.isLoaded('mongodb')) { this.loadDeclaration('mongodb') }
if (!this.isLoaded('ws')) { this.loadDeclaration('ws') }
if (!this.isLoaded('@types/node')) { this.loadDeclaration('@types/node') }
}

Expand Down
Loading

0 comments on commit 025dd39

Please sign in to comment.