Skip to content

Commit

Permalink
Fix ping loop state, make node server close method async
Browse files Browse the repository at this point in the history
  • Loading branch information
texodus committed Mar 25, 2023
1 parent 83f2919 commit 99ed875
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 111 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
"puppeteer": "^13.1.3",
"rimraf": "^2.6.2",
"sinon": "^7.3.1",
"stoppable": "^1.1.0",
"style-loader": "^3.3.1",
"term-img": "^4.1.0",
"timezone-mock": "^1.3.6",
Expand Down
9 changes: 6 additions & 3 deletions packages/perspective/src/js/perspective.node.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const { Server } = require("./api/server.js");
const { WebSocketManager } = require("./websocket/manager");
const { WebSocketClient } = require("./websocket/client");
const { get_config, get_type_config } = require("./config/index.js");
const stoppable = require("stoppable");

const perspective = require("./perspective.js").default;

Expand Down Expand Up @@ -204,7 +205,9 @@ class WebSocketServer extends WebSocketManager {
assets = assets || ["./"];

// Serve Perspective files through HTTP
this._server = http.createServer(perspective_assets(assets, host_psp));
this._server = stoppable(
http.createServer(perspective_assets(assets, host_psp))
);

// Serve Worker API through WebSockets
this._wss = new WebSocket.Server({
Expand All @@ -230,9 +233,9 @@ class WebSocketServer extends WebSocketManager {
});
}

close() {
async close() {
super.clear();
this._server.close();
await new Promise((x) => this._server.stop(x));
}
}

Expand Down
201 changes: 106 additions & 95 deletions packages/perspective/src/js/websocket/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,103 @@ import { Client } from "../api/client.js";
// Initiate a `ping` to the server every 30 seconds
const PING_TIMEOUT = 30000;
export class WebSocketClient extends Client {
_ping() {
this._ping_loop && this._ws.send("ping");
this._ping_loop = setTimeout(this._ping.bind(this), PING_TIMEOUT);
}

_close() {
clearTimeout(this._ping_loop);
this._ping_loop = undefined;
this._on_close_callback?.();
}

_onmessage(msg) {
if (msg.data === "pong") {
return;
}

if (this._pending_binary) {
// Process a binary being sent by the server, which
// can decide how many chunks to send and the size of each
// chunk.
let binary_msg = msg.data;

this._full_binary.set(
new Uint8Array(binary_msg),
this._total_chunk_length
);
this._total_chunk_length += binary_msg.byteLength;

// Use the total length of the binary from the pre-message
// to decide when to stop waiting for new chunks from the
// server.
if (this._total_chunk_length === this._pending_binary_length) {
// Chunking is complete and the binary has been received
// in full.
binary_msg = this._full_binary.buffer;
} else {
// Wait for another chunk.
return;
}

let result = {
data: {
id: this._pending_binary,
data: binary_msg,
},
};

// make sure on_update callbacks are called with a `port_id`
// AND the transferred binary.
if (this._pending_port_id !== undefined) {
const new_data_with_port_id = {
port_id: this._pending_port_id,
delta: binary_msg,
};
result.data.data = new_data_with_port_id;
}

// Send the joined message to the client for handling.
this._handle(result);

// Reset flags to end special message flow.
delete this._pending_binary;
delete this._pending_binary_length;
delete this._pending_port_id;

this._total_chunk_length = 0;
this._full_binary = null;
} else {
msg = JSON.parse(msg.data);

// If the message has `binary_length` set,the worker expects the
// next message to be a binary message. This sets the
// `_pending_binary` flag, which triggers a special handler for
// the ArrayBuffer containing binary data.
if (msg.binary_length) {
this._pending_binary = msg.id;
this._pending_binary_length = msg.binary_length;

// Check whether the message also contains a `port_id`,
// indicating that we are in an `on_update` callback and
// the pending binary needs to be joined with the port_id
// for on_update handlers to work properly.
if (msg.data && msg.data.port_id !== undefined) {
this._pending_port_id = msg.data.port_id;
}

// Create an empty ArrayBuffer to hold the binary, as it
// will be sent in n >= 1 chunks.
this._full_binary = new Uint8Array(this._pending_binary_length);
} else {
this._handle({
data: msg,
});
}
}
}

constructor(ws) {
super();
this._ws = ws;
Expand All @@ -26,100 +123,9 @@ export class WebSocketClient extends Client {
});
};

const ping = () => {
this._ws.send("ping");
setTimeout(ping, PING_TIMEOUT);
};

setTimeout(ping, PING_TIMEOUT);

this._ws.onmessage = (msg) => {
if (msg.data === "pong") {
return;
}

if (this._pending_binary) {
// Process a binary being sent by the server, which
// can decide how many chunks to send and the size of each
// chunk.
let binary_msg = msg.data;

this._full_binary.set(
new Uint8Array(binary_msg),
this._total_chunk_length
);
this._total_chunk_length += binary_msg.byteLength;

// Use the total length of the binary from the pre-message
// to decide when to stop waiting for new chunks from the
// server.
if (this._total_chunk_length === this._pending_binary_length) {
// Chunking is complete and the binary has been received
// in full.
binary_msg = this._full_binary.buffer;
} else {
// Wait for another chunk.
return;
}

let result = {
data: {
id: this._pending_binary,
data: binary_msg,
},
};

// make sure on_update callbacks are called with a `port_id`
// AND the transferred binary.
if (this._pending_port_id !== undefined) {
const new_data_with_port_id = {
port_id: this._pending_port_id,
delta: binary_msg,
};
result.data.data = new_data_with_port_id;
}

// Send the joined message to the client for handling.
this._handle(result);

// Reset flags to end special message flow.
delete this._pending_binary;
delete this._pending_binary_length;
delete this._pending_port_id;

this._total_chunk_length = 0;
this._full_binary = null;
} else {
msg = JSON.parse(msg.data);

// If the message has `binary_length` set,the worker expects the
// next message to be a binary message. This sets the
// `_pending_binary` flag, which triggers a special handler for
// the ArrayBuffer containing binary data.
if (msg.binary_length) {
this._pending_binary = msg.id;
this._pending_binary_length = msg.binary_length;

// Check whether the message also contains a `port_id`,
// indicating that we are in an `on_update` callback and
// the pending binary needs to be joined with the port_id
// for on_update handlers to work properly.
if (msg.data && msg.data.port_id !== undefined) {
this._pending_port_id = msg.data.port_id;
}

// Create an empty ArrayBuffer to hold the binary, as it
// will be sent in n >= 1 chunks.
this._full_binary = new Uint8Array(
this._pending_binary_length
);
} else {
this._handle({
data: msg,
});
}
}
};
this._ping();
this._ws.onclose = this._close.bind(this);
this._ws.onmessage = this._onmessage.bind(this);
}

/**
Expand All @@ -134,6 +140,11 @@ export class WebSocketClient extends Client {
* receiver.
*/
send(msg) {
if (this._ws.readyState === WebSocket.CLOSED) {
console.warn("Websocket connection is already closed.");
return;
}

if (
msg.args &&
msg.args.length > 0 &&
Expand All @@ -151,7 +162,7 @@ export class WebSocketClient extends Client {

terminate() {
return new Promise((resolve) => {
this._ws.onclose = resolve;
this._on_close_callback = resolve;
this._ws.close();
});
}
Expand Down
15 changes: 13 additions & 2 deletions packages/perspective/test/js/remote.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ let server;
let port;

describe("WebSocketManager", function () {
beforeAll(() => {
beforeEach(() => {
server = new perspective.WebSocketServer({ port: 0 });
port = server._server.address().port;
});

afterAll(() => {
afterEach(() => {
server.close();
});

Expand Down Expand Up @@ -226,4 +226,15 @@ describe("WebSocketManager", function () {
client_table.update(arrow, { port_id: update_port });
})();
});

it("disables ping loop on disconnect", async () => {
const data = [{ x: 1 }];
const table = await perspective.table(data);
server.host_table("test", table);
const client = perspective.websocket(`ws://localhost:${port}`);
const _client_table = await client.open_table("test");
expect(await _client_table.size()).toEqual(1);
await server.close();
expect(client._ping_loop).toBeUndefined();
});
});
18 changes: 7 additions & 11 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -4625,16 +4625,7 @@
"@types/qs" "*"
"@types/range-parser" "*"

"@types/express-ws@^3.0.0":
version "3.0.0"
resolved "https://registry.yarnpkg.com/@types/express-ws/-/express-ws-3.0.0.tgz#89674edba2e9141916fc4d4d30fbd4f810e6b80b"
integrity sha512-GxsWec7Vp6h7sJuK0PwnZHeXNZnOwQn8kHAbCfvii66it5jXHTWzSg5cgHVtESwJfBLOe9SJ5wmM7C6gsDoyQw==
dependencies:
"@types/express" "*"
"@types/express-serve-static-core" "*"
"@types/ws" "*"

"@types/express@*", "@types/express@^4.17.3":
"@types/express@*":
version "4.17.11"
resolved "https://registry.yarnpkg.com/@types/express/-/express-4.17.11.tgz#debe3caa6f8e5fcda96b47bd54e2f40c4ee59545"
integrity sha512-no+R6rW60JEc59977wIxreQVsIEOAYwgCqldrA/vkpCnbD7MqTefO97lmoBe4WE0F156bC4uLSP1XHDOySnChg==
Expand Down Expand Up @@ -4990,7 +4981,7 @@
"@types/source-list-map" "*"
source-map "^0.6.1"

"@types/ws@*", "@types/ws@^7.2.2":
"@types/ws@^7.2.2":
version "7.4.0"
resolved "https://registry.yarnpkg.com/@types/ws/-/ws-7.4.0.tgz#499690ea08736e05a8186113dac37769ab251a0e"
integrity sha512-Y29uQ3Uy+58bZrFLhX36hcI3Np37nqWE7ky5tjiDoy1GDZnIwVxS0CgF+s+1bXMzjKBFy+fqaRfb708iNzdinw==
Expand Down Expand Up @@ -16840,6 +16831,11 @@ steno@^0.4.1:
dependencies:
graceful-fs "^4.1.3"

stoppable@^1.1.0:
version "1.1.0"
resolved "https://registry.yarnpkg.com/stoppable/-/stoppable-1.1.0.tgz#32da568e83ea488b08e4d7ea2c3bcc9d75015d5b"
integrity sha512-KXDYZ9dszj6bzvnEMRYvxgeTHU74QBFL54XKtP3nyMuJ81CFYtABZ3bAzL2EdFUaEwJOBOgENyFj3R7oTzDyyw==

stream-connect@^1.0.2:
version "1.0.2"
resolved "https://registry.yarnpkg.com/stream-connect/-/stream-connect-1.0.2.tgz#18bc81f2edb35b8b5d9a8009200a985314428a97"
Expand Down

0 comments on commit 99ed875

Please sign in to comment.