Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): Use fastify-websocket #200

Merged
merged 5 commits into from
Jun 6, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
test: add fastify-websocket tserver
  • Loading branch information
enisdenjo committed Jun 6, 2021
commit b3aef905253d8cb656618e31846fd9063fcd382f
14 changes: 14 additions & 0 deletions src/tests/use.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import http from 'http';
import ws from 'ws';
import stream from 'stream';
import fastify, { FastifyRequest } from 'fastify';
import { SocketStream as FastifySocketStream } from 'fastify-websocket';
import {
MessageType,
stringifyMessage,
Expand All @@ -12,6 +15,7 @@ import {
tServers,
WSExtra,
UWSExtra,
FastifyExtra,
waitForDone,
} from './utils';

Expand Down Expand Up @@ -108,6 +112,16 @@ for (const { tServer, startTServer } of tServers) {
expect((ctx.extra as WSExtra).request).toBeInstanceOf(
http.IncomingMessage,
);
} else if (tServer === 'fastify-websocket') {
expect((ctx.extra as FastifyExtra).connection).toBeInstanceOf(
stream.Duplex,
);
expect(
(ctx.extra as FastifyExtra).connection.socket,
).toBeInstanceOf(ws);
expect((ctx.extra as FastifyExtra).request.constructor.name).toBe(
'Request',
);
} else {
throw new Error('Missing test case for ' + tServer);
}
Expand Down
210 changes: 208 additions & 2 deletions src/tests/utils/tservers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,19 @@ import { ServerOptions, Context } from '../../server';

import ws from 'ws';
import uWS from 'uWebSockets.js';
import createFastify from 'fastify';
import fastifyWebsocket from 'fastify-websocket';

import { useServer as useWSServer, Extra as WSExtra } from '../../use/ws';
import {
makeBehavior as makeUWSBehavior,
Extra as UWSExtra,
} from '../../use/uWebSockets';
export { WSExtra, UWSExtra };
import {
makeHandler as makeFastifyHandler,
Extra as FastifyExtra,
} from '../../use/fastify-websocket';
export { WSExtra, UWSExtra, FastifyExtra };

// distinct server for each test; if you forget to dispose, the fixture wont
const leftovers: Dispose[] = [];
Expand All @@ -38,7 +44,7 @@ export interface TServer {
expire?: number,
) => Promise<void>;
waitForConnect: (
test?: (ctx: Context<WSExtra | UWSExtra>) => void,
test?: (ctx: Context<WSExtra | UWSExtra | FastifyExtra>) => void,
expire?: number,
) => Promise<void>;
waitForOperation: (test?: () => void, expire?: number) => Promise<void>;
Expand Down Expand Up @@ -407,17 +413,217 @@ export async function startUWSTServer(
};
}

export async function startFastifyWSTServer(
options: Partial<ServerOptions> = {},
keepAlive?: number, // for ws tests sake
): Promise<TServer> {
const path = '/simple';
const emitter = new EventEmitter();
const port = await getAvailablePort();

const fastify = createFastify();
fastify.register(fastifyWebsocket);

// sockets to kick off on teardown
const sockets = new Set<ws>();

const pendingConnections: Context<FastifyExtra>[] = [];
const pendingClients: TServerClient[] = [];
let pendingOperations = 0,
pendingCompletes = 0,
pendingCloses = 0;

function toClient(socket: ws): TServerClient {
return {
onMessage: (cb) => {
socket.on('message', cb);
return () => socket.off('message', cb);
},
close: (...args) => socket.close(...args),
};
}

fastify.get(path, { websocket: true }, (connection, request) => {
sockets.add(connection.socket);
pendingClients.push(toClient(connection.socket));
connection.socket.once('close', () => {
sockets.delete(connection.socket);
pendingCloses++;
emitter.emit('close');
});

makeFastifyHandler(
{
schema,
...options,
onConnect: async (...args) => {
pendingConnections.push(args[0]);
const permitted = await options?.onConnect?.(...args);
emitter.emit('conn');
return permitted;
},
onOperation: async (ctx, msg, args, result) => {
pendingOperations++;
const maybeResult = await options?.onOperation?.(
ctx,
msg,
args,
result,
);
emitter.emit('operation');
return maybeResult;
},
onComplete: async (...args) => {
pendingCompletes++;
await options?.onComplete?.(...args);
emitter.emit('compl');
},
},
keepAlive,
).call(fastify, connection, request);
});

const dispose: Dispose = (beNice) => {
return new Promise((resolve, reject) => {
for (const socket of sockets) {
if (beNice) socket.close(1001, 'Going away');
else socket.terminate();
sockets.delete(socket);
}

fastify.websocketServer.close((err) => {
if (err) return reject(err);
fastify.close(() => {
leftovers.splice(leftovers.indexOf(dispose), 1);
resolve();
});
});
});
};
leftovers.push(dispose);

await new Promise<void>((resolve, reject) => {
fastify.listen(port, (err) => {
if (err) return reject(err);
resolve();
});
});

return {
url: `ws://localhost:${port}${path}`,
getClients() {
return Array.from(fastify.websocketServer.clients, toClient);
},
waitForClient(test, expire) {
return new Promise((resolve) => {
function done() {
// the on connect listener below will be called before our listener, populating the queue
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const client = pendingClients.shift()!;
test?.(client);
resolve();
}
if (pendingClients.length > 0) return done();
fastify.websocketServer.once('connection', done);
if (expire)
setTimeout(() => {
fastify.websocketServer.off('connection', done); // expired
resolve();
}, expire);
});
},
waitForClientClose(test, expire) {
return new Promise((resolve) => {
function done() {
pendingCloses--;
test?.();
resolve();
}
if (pendingCloses > 0) return done();

emitter.once('close', done);
if (expire)
setTimeout(() => {
emitter.off('close', done); // expired
resolve();
}, expire);
});
},
pong,
waitForConnect(test, expire) {
return new Promise((resolve) => {
function done() {
// the on connect listener below will be called before our listener, populating the queue
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const ctx = pendingConnections.shift()!;
test?.(ctx);
resolve();
}
if (pendingConnections.length > 0) return done();
emitter.once('conn', done);
if (expire)
setTimeout(() => {
emitter.off('conn', done); // expired
resolve();
}, expire);
});
},
waitForOperation(test, expire) {
return new Promise((resolve) => {
function done() {
pendingOperations--;
test?.();
resolve();
}
if (pendingOperations > 0) return done();
emitter.once('operation', done);
if (expire)
setTimeout(() => {
emitter.off('operation', done); // expired
resolve();
}, expire);
});
},
waitForComplete(test, expire) {
return new Promise((resolve) => {
function done() {
pendingCompletes--;
test?.();
resolve();
}
if (pendingCompletes > 0) return done();
emitter.once('compl', done);
if (expire)
setTimeout(() => {
emitter.off('compl', done); // expired
resolve();
}, expire);
});
},
dispose,
};
}

export const tServers = [
{
tServer: 'ws' as const,
startTServer: startWSTServer,
itForWS: it,
itForUWS: it.skip,
itForFastify: it.skip,
},
{
tServer: 'uWebSockets.js' as const,
startTServer: startUWSTServer,
itForWS: it.skip,
itForUWS: it,
itForFastify: it.skip,
},
{
tServer: 'fastify-websocket' as const,
startTServer: startFastifyWSTServer,
itForWS: it.skip,
itForUWS: it.skip,
itForFastify: it,
},
];