From 2d039fb8dfb16c987194cdbc7928136501d3ba2a Mon Sep 17 00:00:00 2001 From: zemeolotu Date: Fri, 6 Mar 2020 12:35:00 -0500 Subject: [PATCH 1/2] Refactor out `WebsSocketManager` from `WebSockerServer` --- .../perspective/src/js/perspective.node.js | 174 ++------------- .../src/js/perspective.parallel.js | 59 +---- packages/perspective/src/js/websocket.js | 203 ++++++++++++++++++ packages/perspective/test/js/remote.spec.js | 85 ++++++++ 4 files changed, 309 insertions(+), 212 deletions(-) create mode 100644 packages/perspective/src/js/websocket.js create mode 100644 packages/perspective/test/js/remote.spec.js diff --git a/packages/perspective/src/js/perspective.node.js b/packages/perspective/src/js/perspective.node.js index fd58dbd388..83fe737ad6 100644 --- a/packages/perspective/src/js/perspective.node.js +++ b/packages/perspective/src/js/perspective.node.js @@ -9,6 +9,7 @@ const {Client} = require("./api/client.js"); const {Server} = require("./api/server.js"); +const {WebSocketManager, WebSocketClient} = require("./websocket"); const perspective = require("./perspective.js").default; @@ -53,8 +54,6 @@ SYNC_CLIENT.send({id: -1, cmd: "init"}); module.exports = SYNC_CLIENT; module.exports.sync_module = () => SYNC_SERVER.perspective; -let CLIENT_ID_GEN = 0; - const DEFAULT_ASSETS = [ "@finos/perspective/dist/umd", "@finos/perspective-bench/dist", @@ -88,7 +87,7 @@ function read_promise(filePath) { /** * Host a Perspective server that hosts data, code files, etc. */ -function create_http_server(assets, host_psp) { +function perspective_assets(assets, host_psp) { return async function(request, response) { response.setHeader("Access-Control-Allow-Origin", "*"); response.setHeader("Access-Control-Request-Method", "*"); @@ -145,179 +144,44 @@ function create_http_server(assets, host_psp) { }; } -/** - * A WebSocket server instance for a remote perspective, and convenience HTTP - * file server for easy hosting. - */ -class WebSocketServer extends Server { - constructor({port, assets, host_psp, on_start}) { - super(module.exports); +class WebSocketServer extends WebSocketManager { + constructor({assets, host_psp, port, on_start} = {}) { + super(); port = typeof port === "undefined" ? 8080 : port; assets = assets || ["./"]; // Serve Perspective files through HTTP - this._server = http.createServer(create_http_server(assets, host_psp)); - - this.REQS = {}; - this.REQ_ID_MAP = new Map(); + this._server = http.createServer(perspective_assets(assets, host_psp)); // Serve Worker API through WebSockets this._wss = new WebSocket.Server({noServer: true, perMessageDeflate: true}); // When the server starts, define how to handle messages - this._wss.on("connection", ws => { - ws.isAlive = true; - ws.id = CLIENT_ID_GEN++; + this._wss.on("connection", ws => this.add_connection(ws)); - // Parse incoming messages - ws.on("message", msg => { - ws.isAlive = true; - if (msg === "heartbeat") { - ws.send("heartbeat"); - return; - } - msg = JSON.parse(msg); - const compound_id = `${msg.id}/${ws.id}`; - this.REQ_ID_MAP.set(compound_id, msg.id); - msg.id = compound_id; - this.REQS[msg.id] = {ws, msg}; - try { - // Send all messages to the handler defined in - // Perspective.Server - this.process(msg, ws.id); - } catch (e) { - console.error(e); - } - }); - ws.on("close", () => { - this.clear_views(ws.id); - }); - ws.on("error", console.error); + this._server.on("upgrade", (request, socket, head) => { + console.log("200 *** websocket upgrade ***"); + this._wss.handleUpgrade(request, socket, head, sock => this._wss.emit("connection", sock, request)); }); - // clear invalid connections - setInterval(() => { - this._wss.clients.forEach(function each(ws) { - if (ws.isAlive === false) { - return ws.terminate(); - } - ws.isAlive = false; - }); - }, 30000); - - this._server.on( - "upgrade", - function upgrade(request, socket, head) { - console.log("200 *** websocket upgrade ***"); - this._wss.handleUpgrade( - request, - socket, - head, - function done(sock) { - this._wss.emit("connection", sock, request); - }.bind(this) - ); - }.bind(this) - ); - this._server.listen(port, () => { - console.log(`Listening on port ${port}`); + console.log(`Listening on port ${this._server.address().port}`); if (on_start) { - on_start.bind(this)(); + on_start(); } }); } - /** - * Send an asynchronous message to the Perspective web worker. - * - * If the `transferable` param is set, pass two messages: the string - * representation of the message and then the ArrayBuffer data that needs to - * be transferred. The `is_transferable` flag tells the client to expect the - * next message to be a transferable object. - * - * @param {Object} msg a valid JSON-serializable message to pass to the - * client - * @param {*} transferable a transferable object to be sent to the client - */ - post(msg, transferable) { - const req = this.REQS[msg.id]; - const id = msg.id; - if (req.ws.readyState > 1) { - delete this.REQS[id]; - throw new Error("Connection closed"); - } - msg.id = this.REQ_ID_MAP.get(id); - if (transferable) { - msg.is_transferable = true; - req.ws.send(JSON.stringify(msg)); - req.ws.send(transferable[0]); - } else { - req.ws.send(JSON.stringify(msg)); - } - if (!req.msg.subscribe) { - this.REQ_ID_MAP.delete(id); - delete this.REQS[id]; - } - } - - _host(cache, name, input) { - if (cache[name] !== undefined) { - throw new Error(`"${name}" already exists`); - } - input.on_delete(() => { - delete cache[name]; - }); - cache[name] = input; - } - - /** - * Expose a Perspective `table` through the WebSocket, allowing - * it to be accessed by a unique name from a client. Hosted objects - * are automatically `eject`ed when their `delete()` method is called. - * - * @param {String} name - * @param {perspective.table} table `table` to host. - */ - host_table(name, table) { - this._host(this._tables, name, table); - } - - /** - * Expose a Perspective `view` through the WebSocket, allowing - * it to be accessed by a unique name from a client. Hosted objects - * are automatically `eject`ed when their `delete()` method is called. - * - * @param {String} name - * @param {perspective.view} view `view` to host. - */ - host_view(name, view) { - this._host(this._views, name, view); - } - - /** - * Cease hosting a `table` on this server. Hosted objects - * are automatically `eject`ed when their `delete()` method is called. - * - * @param {String} name - */ - eject_table(name) { - delete this._tables[name]; - } - - /** - * Cease hosting a `view` on this server. Hosted objects - * are automatically `eject`ed when their `delete()` method is called. - * - * @param {String} name - */ - eject_view(name) { - delete this._views[name]; - } - close() { this._server.close(); } } +const websocket = url => { + return new WebSocketClient(new WebSocket(url)); +}; + +module.exports.websocket = websocket; +module.exports.perspective_assets = perspective_assets; module.exports.WebSocketServer = WebSocketServer; +module.exports.WebSocketManager = WebSocketManager; diff --git a/packages/perspective/src/js/perspective.parallel.js b/packages/perspective/src/js/perspective.parallel.js index 32d08632f6..658f1d4ce0 100644 --- a/packages/perspective/src/js/perspective.parallel.js +++ b/packages/perspective/src/js/perspective.parallel.js @@ -10,13 +10,12 @@ import * as defaults from "./config/constants.js"; import {get_config} from "./config"; import {Client} from "./api/client.js"; +const {WebSocketClient} = require("./websocket"); import wasm_worker from "./perspective.wasm.js"; import wasm from "./psp.async.wasm.js"; import {override_config} from "../../dist/esm/config/index.js"; -const HEARTBEAT_TIMEOUT = 15000; - // eslint-disable-next-line max-len const INLINE_WARNING = `Perspective has been compiled in INLINE mode. While Perspective's runtime performance is not affected, you may see smaller assets size and faster engine initial load time using "@finos/perspective-webpack-plugin" to build your application. @@ -121,60 +120,6 @@ class WebWorkerClient extends Client { } } -/** - * Given a WebSocket URL, connect to the socket located at `url`. - * - * The `onmessage` handler receives incoming messages and sends it to the - * WebWorker through `this._handle`. - * - * If the message has a transferable asset, set the `pending_arrow` flag to tell - * the worker the next message is an ArrayBuffer. - */ -class WebSocketClient extends Client { - constructor(url) { - super(); - this._ws = new WebSocket(url); - this._ws.binaryType = "arraybuffer"; - this._ws.onopen = () => { - this.send({id: -1, cmd: "init"}); - }; - const heartbeat = () => { - this._ws.send("heartbeat"); - setTimeout(heartbeat, HEARTBEAT_TIMEOUT); - }; - setTimeout(heartbeat, 15000); - this._ws.onmessage = msg => { - if (msg.data === "heartbeat") { - return; - } - if (this._pending_arrow) { - this._handle({data: {id: this._pending_arrow, data: msg.data}}); - delete this._pending_arrow; - } else { - msg = JSON.parse(msg.data); - - // If the `is_transferable` flag is set, the worker expects the - // next message to be a transferable object. This sets the - // `_pending_arrow` flag, which triggers a special handler for - // the ArrayBuffer containing arrow data. - if (msg.is_transferable) { - this._pending_arrow = msg.id; - } else { - this._handle({data: msg}); - } - } - }; - } - - send(msg) { - this._ws.send(JSON.stringify(msg)); - } - - terminate() { - this._ws.close(); - } -} - /****************************************************************************** * * Web Worker Singleton @@ -224,7 +169,7 @@ const mod = { * @param {*} [config] An optional perspective config object override */ websocket(url = window.location.origin.replace("http", "ws")) { - return new WebSocketClient(url); + return new WebSocketClient(new WebSocket(url)); }, shared_worker(config) { diff --git a/packages/perspective/src/js/websocket.js b/packages/perspective/src/js/websocket.js new file mode 100644 index 0000000000..f0cdf4f65a --- /dev/null +++ b/packages/perspective/src/js/websocket.js @@ -0,0 +1,203 @@ +import {Client} from "./api/client.js"; +import {Server} from "./api/server.js"; + +const HEARTBEAT_TIMEOUT = 15000; +let CLIENT_ID_GEN = 0; + +export class WebSocketClient extends Client { + constructor(ws) { + super(); + this._ws = ws; + this._ws.binaryType = "arraybuffer"; + this._ws.onopen = () => { + this.send({id: -1, cmd: "init"}); + }; + const heartbeat = () => { + this._ws.send("heartbeat"); + setTimeout(heartbeat, HEARTBEAT_TIMEOUT); + }; + setTimeout(heartbeat, 15000); + this._ws.onmessage = msg => { + if (msg.data === "heartbeat") { + return; + } + if (this._pending_arrow) { + this._handle({data: {id: this._pending_arrow, data: msg.data}}); + delete this._pending_arrow; + } else { + msg = JSON.parse(msg.data); + + // If the `is_transferable` flag is set, the worker expects the + // next message to be a transferable object. This sets the + // `_pending_arrow` flag, which triggers a special handler for + // the ArrayBuffer containing arrow data. + if (msg.is_transferable) { + this._pending_arrow = msg.id; + } else { + this._handle({data: msg}); + } + } + }; + } + + send(msg) { + this._ws.send(JSON.stringify(msg)); + } + + terminate() { + return new Promise(resolve => { + this._ws.onclose = resolve; + this._ws.close(); + }); + } +} + +/** + * A WebSocket Manager instance for a remote perspective + */ +export class WebSocketManager extends Server { + constructor(...args) { + super(...args); + this.requests_id_map = new Map(); + this.requests = {}; + this.websockets = {}; + + // clear invalid connections + setInterval(() => { + Object.entries(this.websockets).forEach(([id, ws]) => { + if (ws.isAlive === false) { + delete this.websockets[id]; + return ws.terminate(); + } + ws.isAlive = false; + }); + }, 30000); + } + + /** + * Add a new websocket connection to the manager + * + * The WebsocketManager manages the websocket connection and processes every + * message received from each connections. When a websocket connection is + * `closed`, the websocket manager will clear all subscriptions associated + * with the connection + * + * @param {WebSocket} ws a websocket connection + */ + add_connection(ws) { + ws.isAlive = true; + ws.id = CLIENT_ID_GEN++; + + // Parse incoming messages + ws.on("message", msg => { + ws.isAlive = true; + if (msg === "heartbeat") { + ws.send("heartbeat"); + return; + } + msg = JSON.parse(msg); + try { + // Send all messages to the handler defined in + // Perspective.Server + const compoundId = `${msg.id}/${ws.id}`; + this.requests_id_map.set(compoundId, msg.id); + msg.id = compoundId; + this.requests[msg.id] = {ws, msg}; + this.process(msg, ws.id); + } catch (e) { + console.error(e); + } + }); + ws.on("close", () => { + this.clear_views(ws.id); + }); + ws.on("error", console.error); + } + + /** + * Send an asynchronous message to the Perspective web worker. + * + * If the `transferable` param is set, pass two messages: the string + * representation of the message and then the ArrayBuffer data that needs to + * be transferred. The `is_transferable` flag tells the client to expect the + * next message to be a transferable object. + * + * @param {Object} msg a valid JSON-serializable message to pass to the + * client + * @param {*} transferable a transferable object to be sent to the client + */ + post(msg, transferable) { + const req = this.requests[msg.id]; + const id = msg.id; + if (req.ws.readyState > 1) { + delete this.requests[id]; + throw new Error("Connection closed"); + } + msg.id = this.requests_id_map.get(id); + if (transferable) { + msg.is_transferable = true; + req.ws.send(JSON.stringify(msg)); + req.ws.send(transferable[0]); + } else { + req.ws.send(JSON.stringify(msg)); + } + if (!req.msg.subscribe) { + this.requests_id_map.delete(id); + delete this.requests[id]; + } + } + + _host(cache, name, input) { + if (cache[name] !== undefined) { + throw new Error(`"${name}" already exists`); + } + input.on_delete(() => { + delete cache[name]; + }); + cache[name] = input; + } + + /** + * Expose a Perspective `table` through the WebSocket, allowing + * it to be accessed by a unique name from a client. Hosted objects + * are automatically `eject`ed when their `delete()` method is called. + * + * @param {String} name + * @param {perspective.table} table `table` to host. + */ + host_table(name, table) { + this._host(this._tables, name, table); + } + + /** + * Expose a Perspective `view` through the WebSocket, allowing + * it to be accessed by a unique name from a client. Hosted objects + * are automatically `eject`ed when their `delete()` method is called. + * + * @param {String} name + * @param {perspective.view} view `view` to host. + */ + host_view(name, view) { + this._host(this._views, name, view); + } + + /** + * Cease hosting a `table` on this server. Hosted objects + * are automatically `eject`ed when their `delete()` method is called. + * + * @param {String} name + */ + eject_table(name) { + delete this._tables[name]; + } + + /** + * Cease hosting a `view` on this server. Hosted objects + * are automatically `eject`ed when their `delete()` method is called. + * + * @param {String} name + */ + eject_view(name) { + delete this._views[name]; + } +} diff --git a/packages/perspective/test/js/remote.spec.js b/packages/perspective/test/js/remote.spec.js new file mode 100644 index 0000000000..d06b239411 --- /dev/null +++ b/packages/perspective/test/js/remote.spec.js @@ -0,0 +1,85 @@ +/****************************************************************************** + * + * Copyright (c) 2017, the Perspective Authors. + * + * This file is part of the Perspective library, distributed under the terms of + * the Apache License 2.0. The full license can be found in the LICENSE file. + * + */ + +const perspective = require("../../dist/cjs/perspective.node.js"); + +let server; +let port; + +describe("WebSocketManager", function() { + beforeAll(() => { + server = new perspective.WebSocketServer({port: 0}); + port = server._server.address().port; + }); + + afterAll(() => { + server.close(); + }); + + it("sends initial data client on subscribe", async () => { + const data = [{x: 1}]; + const table = perspective.table(data); + server.host_table("test", table); + + const client = perspective.websocket(`ws://localhost:${port}`); + const client_table = client.open_table("test"); + const client_data = await client_table.view().to_json(); + expect(client_data).toEqual(data); + + await client.terminate(); + server.eject_table("test"); + }); + + it("sends initial data multiples client on subscribe", async () => { + const data = [{x: 1}]; + const table = perspective.table(data); + server.host_table("test", table); + + const client_1 = perspective.websocket(`ws://localhost:${port}`); + const client_2 = perspective.websocket(`ws://localhost:${port}`); + + const client_1_table = client_1.open_table("test"); + const client_2_table = client_2.open_table("test"); + + const client_1_data = await client_1_table.view().to_json(); + const client_2_data = await client_2_table.view().to_json(); + + await client_1.terminate(); + await client_2.terminate(); + + expect(client_1_data).toEqual(data); + expect(client_2_data).toEqual(data); + server.eject_table("test"); + }); + + it("sends updates to client on subscribe", done => { + const data = [{x: 1}]; + const table = perspective.table(data); + server.host_table("test", table); + + const client = perspective.websocket(`ws://localhost:${port}`); + const client_table = client.open_table("test"); + + const client_view = client_table.view(); + const on_update = () => { + client_view.to_json().then(async updated_data => { + server.eject_table("test"); + expect(updated_data).toEqual([{x: 1}, {x: 2}]); + await client.terminate(); + setTimeout(done); + }); + }; + + client_view.on_update(on_update); + client_view.to_json().then(client_data => { + expect(client_data).toEqual(data); + table.update([{x: 2}]); + }); + }); +}); From a64a69f3f21e14ec864c0db9cb6e9e3cd9b8b174 Mon Sep 17 00:00:00 2001 From: zemeolotu Date: Fri, 6 Mar 2020 18:49:58 -0500 Subject: [PATCH 2/2] Add express node.js remote server example --- examples/datasources/index.js | 13 ++- examples/datasources/securities.js | 115 ++++++++++++++++++++++----- examples/remote-express/README.md | 5 ++ examples/remote-express/index.css | 31 ++++++++ examples/remote-express/index.html | 69 ++++++++++++++++ examples/remote-express/package.json | 19 +++++ examples/remote-express/server.js | 27 +++++++ examples/remote/server.js | 107 +------------------------ yarn.lock | 18 ++++- 9 files changed, 276 insertions(+), 128 deletions(-) create mode 100644 examples/remote-express/README.md create mode 100644 examples/remote-express/index.css create mode 100644 examples/remote-express/index.html create mode 100644 examples/remote-express/package.json create mode 100644 examples/remote-express/server.js diff --git a/examples/datasources/index.js b/examples/datasources/index.js index 3d792e1a88..feef240fe0 100644 --- a/examples/datasources/index.js +++ b/examples/datasources/index.js @@ -1 +1,12 @@ -export {datasource as securities} from "./securities"; +/****************************************************************************** + * + * Copyright (c) 2017, the Perspective Authors. + * + * This file is part of the Perspective library, distributed under the terms of + * the Apache License 2.0. The full license can be found in the LICENSE file. + * + */ + +const securities = require("./securities"); + +module.exports.securities = securities; diff --git a/examples/datasources/securities.js b/examples/datasources/securities.js index 5d5f7feeee..459a4aa6c6 100644 --- a/examples/datasources/securities.js +++ b/examples/datasources/securities.js @@ -1,17 +1,53 @@ -import perspective from "@finos/perspective"; +/****************************************************************************** + * + * Copyright (c) 2017, the Perspective Authors. + * + * This file is part of the Perspective library, distributed under the terms of + * the Apache License 2.0. The full license can be found in the LICENSE file. + * + */ -const worker = perspective.shared_worker(); +const perspective = require("@finos/perspective"); -const SECURITIES = ["AAPL.N", "AMZN.N", "QQQ.N", "NVDA.N", "TSLA.N", "FB.N", "MSFT.N", "TLT.N", "XIV.N", "YY.N", "CSCO.N", "GOOGL.N", "PCLN.N"]; +const worker = perspective.shared_worker ? perspective.shared_worker() : perspective; + +// Cache updates for faster update rates (but less data diversity)> +const CACHE_INPUT = false; + +// If cached, how many updates to cache? +const CACHE_ENTRIES = 200; + +// How many rows per update? +const UPDATE_SIZE = 50; + +// Update every N milliseconds +const TICK_RATE = 20; +// Size limit of the server-side table +const TABLE_SIZE = 10000; + +const SECURITIES = ["AAPL.N", "AMZN.N", "QQQ.N", "NVDA.N", "TSLA.N", "FB.N", "MSFT.N", "TLT.N", "XIV.N", "YY.N", "CSCO.N", "GOOGL.N", "PCLN.N"]; const CLIENTS = ["Homer", "Marge", "Bart", "Lisa", "Maggie", "Moe", "Lenny", "Carl", "Krusty"]; -const newRows = () => { +const __CACHE__ = []; + +perspective.initialize_profile_thread(); + +/******************************************************************************* + * + * Slow mode (new rows generated on the fly) + */ + +function choose(choices) { + return choices[Math.floor(Math.random() * choices.length)]; +} + +function newRows(total_rows) { const rows = []; - for (let x = 0; x < 5; x++) { + for (let x = 0; x < total_rows; x++) { rows.push({ - name: SECURITIES[Math.floor(Math.random() * SECURITIES.length)], - client: CLIENTS[Math.floor(Math.random() * CLIENTS.length)], + name: choose(SECURITIES), + client: choose(CLIENTS), lastUpdate: new Date(), chg: Math.random() * 20 - 10, bid: Math.random() * 10 + 90, @@ -20,22 +56,61 @@ const newRows = () => { }); } return rows; -}; +} + +async function init_dynamic({table_size, update_size, tick_rate}) { + // Create a `table`. + const table = worker.table(newRows(table_size), {limit: table_size}); + + // The `table` needs to be registered to a name with the Perspective + // `WebSocketServer` in order for the client to get a proxy handle to it. + + // Loop and update the `table` oocasionally. + (function postRow() { + table.update(newRows(update_size)); + setTimeout(postRow, tick_rate); + })(); + return table; +} + +/******************************************************************************* + * + * Fast mode (rows pre-generated, cached as Arrows) + */ + +async function newArrow(total_rows) { + const table = worker.table(newRows(total_rows)); + const vw = table.view(); + const arrow = await vw.to_arrow(); + vw.delete(); + table.delete(); + return arrow; +} -const getTable = (updating = true) => { - const table = worker.table(newRows(), { - index: "name" - }); - - if (updating) { - const update = () => { - table.update(newRows()); - setTimeout(update, 500); - }; - update(); +async function populate_cache(cache_entries) { + for (let x = 0; x < cache_entries; x++) { + let arrow = await newArrow(); + __CACHE__[x] = arrow; } +} +async function init_cached({table_size, tick_rate, cache_entries}) { + await populate_cache(cache_entries); + const table = worker.table(newRows(table_size), {limit: table_size}); + (function postRow() { + const entry = __CACHE__[Math.floor(Math.random() * __CACHE__.length)]; + table.update(entry); + setTimeout(postRow, tick_rate); + })(); return table; +} + +const getTable = (config = {cached: CACHE_INPUT, tick_rate: TICK_RATE, update_size: UPDATE_SIZE, table_size: TABLE_SIZE, cache_entries: CACHE_ENTRIES}) => { + if (config.cached) { + return init_cached(config); + } else { + return init_dynamic(config); + } }; -export const datasource = getTable(); +module.exports = getTable; diff --git a/examples/remote-express/README.md b/examples/remote-express/README.md new file mode 100644 index 0000000000..102106a5ac --- /dev/null +++ b/examples/remote-express/README.md @@ -0,0 +1,5 @@ +# Remote Example + +An express node.js server example hosting a perspective table to a client-side +perspective running in a WebWorker. Both "State of the World" and subsequent +update data are transferred via Arrow-encoded ArrayBuffers over WebSocket. \ No newline at end of file diff --git a/examples/remote-express/index.css b/examples/remote-express/index.css new file mode 100644 index 0000000000..816a438bc9 --- /dev/null +++ b/examples/remote-express/index.css @@ -0,0 +1,31 @@ +/****************************************************************************** + * + * Copyright (c) 2017, the Perspective Authors. + * + * This file is part of the Perspective library, distributed under the terms of + * the Apache License 2.0. The full license can be found in the LICENSE file. + * + */ + + perspective-viewer { + position: absolute; + top: 0; + left: 0; + right: 0; + bottom: 0; +} + +@media (max-width: 600px) { + html { + overflow: hidden; + } + + body { + position: fixed; + height: 100%; + width: 100%; + margin: 0; + overflow: hidden; + touch-action: none; + } +} \ No newline at end of file diff --git a/examples/remote-express/index.html b/examples/remote-express/index.html new file mode 100644 index 0000000000..ce6e278151 --- /dev/null +++ b/examples/remote-express/index.html @@ -0,0 +1,69 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/examples/remote-express/package.json b/examples/remote-express/package.json new file mode 100644 index 0000000000..1e43abf20e --- /dev/null +++ b/examples/remote-express/package.json @@ -0,0 +1,19 @@ +{ + "name": "remote-express", + "private": true, + "version": "0.4.5", + "description": "An example of 2 Perspectives, one client and one server, streaming via Apache Arrow.", + "scripts": { + "start": "node server.js" + }, + "keywords": [], + "license": "Apache-2.0", + "dependencies": { + "@finos/perspective": "^0.4.5", + "@finos/perspective-viewer": "^0.4.5", + "@finos/perspective-viewer-d3fc": "^0.4.5", + "@finos/perspective-viewer-hypergrid": "^0.4.5", + "express": "^4.17.1", + "express-ws": "^4.0.0" + } +} diff --git a/examples/remote-express/server.js b/examples/remote-express/server.js new file mode 100644 index 0000000000..61c44076bf --- /dev/null +++ b/examples/remote-express/server.js @@ -0,0 +1,27 @@ +/****************************************************************************** + * + * Copyright (c) 2017, the Perspective Authors. + * + * This file is part of the Perspective library, distributed under the terms of + * the Apache License 2.0. The full license can be found in the LICENSE file. + * + */ + +const {WebSocketManager, perspective_assets} = require("@finos/perspective"); +const express = require("express"); +const expressWs = require("express-ws"); +const {securities} = require("../datasources"); + +const app = express(); +expressWs(app); + +// create Perspective WebSocketManager and host table +const manager = new WebSocketManager(); +securities().then(table => manager.host_table("remote_table", table)); + +// add connection to manager whenever a new client connects +app.ws("/subscribe", ws => manager.add_connection(ws)); + +app.use("/", perspective_assets([__dirname], true)); + +const server = app.listen(8080, () => console.log(`Listening on port ${server.address().port}`)); diff --git a/examples/remote/server.js b/examples/remote/server.js index ca0f83b80f..c6092730ef 100644 --- a/examples/remote/server.js +++ b/examples/remote/server.js @@ -8,110 +8,7 @@ */ const perspective = require("@finos/perspective"); - -// Cache updates for faster update rates (but less data diversity)> -var CACHE_INPUT = false; - -// If cached, how many updates to cache? -var CACHE_ENTRIES = 200; - -// How many rows per update? -var UPDATE_SIZE = 50; - -// Update every N milliseconds -var TICK_RATE = 20; - -// Size limit of the server-side table -var TABLE_SIZE = 10000; - -var SECURITIES = ["AAPL.N", "AMZN.N", "QQQ.N", "NVDA.N", "TSLA.N", "FB.N", "MSFT.N", "TLT.N", "XIV.N", "YY.N", "CSCO.N", "GOOGL.N", "PCLN.N"]; -var CLIENTS = ["Homer", "Marge", "Bart", "Lisa", "Maggie", "Moe", "Lenny", "Carl", "Krusty"]; - -var __CACHE__ = []; - -perspective.initialize_profile_thread(); - -/******************************************************************************* - * - * Slow mode (new rows generated on the fly) - */ - -function choose(choices) { - return choices[Math.floor(Math.random() * choices.length)]; -} - -function newRows(total_rows = UPDATE_SIZE) { - var rows = []; - for (var x = 0; x < total_rows; x++) { - rows.push({ - name: choose(SECURITIES), - client: choose(CLIENTS), - lastUpdate: new Date(), - chg: Math.random() * 20 - 10, - bid: Math.random() * 10 + 90, - ask: Math.random() * 10 + 100, - vol: Math.random() * 10 + 100 - }); - } - return rows; -} - -async function init_dynamic() { - // Create a `table`. - const table = perspective.table(newRows(TABLE_SIZE), {limit: TABLE_SIZE}); - - // The `table` needs to be registered to a name with the Perspective - // `WebSocketServer` in order for the client to get a proxy handle to it. - host.host_view("data_source_one", table.view()); - - // Loop and update the `table` oocasionally. - (function postRow() { - table.update(newRows()); - setTimeout(postRow, TICK_RATE); - })(); -} - -/******************************************************************************* - * - * Fast mode (rows pre-generated, cached as Arrows) - */ - -async function newArrow(total_rows) { - var table = perspective.table(newRows(total_rows)); - var vw = table.view(); - var arrow = await vw.to_arrow(); - vw.delete(); - table.delete(); - return arrow; -} - -async function populate_cache() { - for (let x = 0; x < CACHE_ENTRIES; x++) { - let arrow = await newArrow(); - __CACHE__[x] = arrow; - } -} - -async function init_cached() { - await populate_cache(); - const tbl = perspective.table(newRows(TABLE_SIZE), {limit: TABLE_SIZE}); - host.host_view("data_source_one", tbl.view()); - (function postRow() { - const entry = __CACHE__[Math.floor(Math.random() * __CACHE__.length)]; - tbl.update(entry); - setTimeout(postRow, TICK_RATE); - })(); -} - -/******************************************************************************* - * - * Main - */ +const {securities} = require("../datasources"); const host = new perspective.WebSocketServer({assets: [__dirname]}); - -if (CACHE_INPUT) { - init_cached(); -} else { - init_dynamic(); -} +securities().then(table => host.host_view("data_source_one", table.view())); diff --git a/yarn.lock b/yarn.lock index 2d97360e55..367b0227cc 100644 --- a/yarn.lock +++ b/yarn.lock @@ -6074,9 +6074,16 @@ expect@^25.1.0: jest-message-util "^25.1.0" jest-regex-util "^25.1.0" +express-ws@^4.0.0: + version "4.0.0" + resolved "https://registry.npmjs.org/express-ws/-/express-ws-4.0.0.tgz#dabd8dc974516418902a41fe6e30ed949b4d36c4" + integrity sha512-KEyUw8AwRET2iFjFsI1EJQrJ/fHeGiJtgpYgEWG3yDv4l/To/m3a2GaYfeGyB3lsWdvbesjF5XCMx+SVBgAAYw== + dependencies: + ws "^5.2.0" + express@^4.17.1: version "4.17.1" - resolved "https://registry.yarnpkg.com/express/-/express-4.17.1.tgz#4491fc38605cf51f8629d39c2b5d026f98a4c134" + resolved "https://registry.npmjs.org/express/-/express-4.17.1.tgz#4491fc38605cf51f8629d39c2b5d026f98a4c134" integrity sha512-mHJ9O79RqluphRrcw2X/GTh3k9tVv8YcoyY4Kkh4WDMUYKRZUq0h1o0w2rrrxBqM7VoeUVqgb27xlEMXTnYt4g== dependencies: accepts "~1.3.7" @@ -15011,7 +15018,7 @@ wrappy@1: write-file-atomic@2.4.1, write-file-atomic@^2.0.0, write-file-atomic@^2.3.0, write-file-atomic@^3.0.0: version "2.4.1" - resolved "https://registry.yarnpkg.com/write-file-atomic/-/write-file-atomic-2.4.1.tgz#d0b05463c188ae804396fd5ab2a370062af87529" + resolved "https://registry.npmjs.org/write-file-atomic/-/write-file-atomic-2.4.1.tgz#d0b05463c188ae804396fd5ab2a370062af87529" integrity sha512-TGHFeZEZMnv+gBFRfjAcxL5bPHrsGKtnb4qsFAws7/vlh+QfwAaySIw4AXP9ZskTTh5GWu3FLuJhsWVdiJPGvg== dependencies: graceful-fs "^4.1.11" @@ -15045,6 +15052,13 @@ write@1.0.3: dependencies: mkdirp "^0.5.1" +ws@^5.2.0: + version "5.2.2" + resolved "https://registry.npmjs.org/ws/-/ws-5.2.2.tgz#dffef14866b8e8dc9133582514d1befaf96e980f" + integrity sha512-jaHFD6PFv6UgoIVda6qZllptQsMlDEJkTQcybzzXDYM1XO9Y8em691FGMPmM46WGyLU4z9KMgQN+qrux/nhlHA== + dependencies: + async-limiter "~1.0.0" + ws@^6.1.2, ws@^6.2.1: version "6.2.1" resolved "https://registry.yarnpkg.com/ws/-/ws-6.2.1.tgz#442fdf0a47ed64f59b6a5d8ff130f4748ed524fb"