Skip to content

Commit

Permalink
feat: enable fastify-sse style api (#41)
Browse files Browse the repository at this point in the history
* update deps

* add support for fastify-sse style api

* update ci/cd

* update readme
  • Loading branch information
mpetrunic authored Jun 6, 2022
1 parent b81a45c commit 538f67b
Show file tree
Hide file tree
Showing 13 changed files with 221 additions and 157 deletions.
54 changes: 45 additions & 9 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
name: CI check

on: [push, pull_request]
on:
push:
branches:
- master
pull_request:
branches:
- '**'

jobs:
build:

test:
runs-on: ubuntu-latest

strategy:
matrix:
node-version: [10.x, 12.x]

node-version: [14, 16]
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3
- name: Use Node.js ${{ matrix.node-version }}
uses: actions/setup-node@v1
uses: actions/setup-node@v3
with:
node-version: ${{ matrix.node-version }}
- run: yarn install --frozen-lockfile
Expand All @@ -23,4 +26,37 @@ jobs:
- run: yarn run check-types
- run: yarn run test
env:
CI: true
CI: true

maybe-release:
name: release
if: github.event_name == 'push' && github.ref == 'refs/heads/master'
runs-on: ubuntu-latest
needs: [test]
steps:
- uses: google-github-actions/release-please-action@v3
id: release
with:
release-type: node
package-name: release-please-action
changelog-types: '[{"type":"feat","section":"Features","hidden":false},{"type":"fix","section":"Bug Fixes","hidden":false},{"type":"chore","section":"Miscellaneous","hidden":false}]'

- uses: actions/checkout@v3
if: ${{ steps.release.outputs.release_created }}

- uses: actions/setup-node@v3
with:
node-version: 16
registry-url: 'https://registry.npmjs.org'
if: ${{ steps.release.outputs.release_created }}

- run: yarn install
if: ${{ steps.release.outputs.release_created }}

- run: yarn build
if: ${{ steps.release.outputs.release_created }}

- run: npm publish
env:
NODE_AUTH_TOKEN: ${{secrets.NPM_AUTH_TOKEN}}
if: ${{ steps.release.outputs.release_created }}
24 changes: 24 additions & 0 deletions .github/workflows/pr.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
name: "Semantic PR"

on:
pull_request_target:
types:
- opened
- edited
- synchronize

jobs:
main:
name: Validate PR title
runs-on: ubuntu-latest
steps:
- uses: amannn/action-semantic-pull-request@v4
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with:
types: |
fix
feat
chore
validateSingleCommit: true #single commit can ovveride squash merge commit message
validateSingleCommitMatchesPrTitle: false
81 changes: 0 additions & 81 deletions .github/workflows/publish.yaml

This file was deleted.

2 changes: 1 addition & 1 deletion .nvmrc
Original file line number Diff line number Diff line change
@@ -1 +1 @@
12.4.0
16
33 changes: 18 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,30 +36,33 @@ server.get("/", function (req, res) {
})());
});
```
#### Sending individual events

##### Sending events from EventEmmiters

Using EventIterator dependency:
```javascript
import {FastifySSEPlugin} from "fastify-sse-v2";
import EventIterator from "event-iterator";

const server = fastify();
server.register(FastifySSEPlugin);

server.get("/", function (req, res) {
const eventEmitter = new EventEmitter();
res.sse(new EventIterator(
(push) => {
eventEmitter.on("some_event", push)
return () => eventEmitter.removeEventListener("some_event", push)
}
)
);
server.get("/", async function (req, res) {
for (let i = 0; i < 10; i++) {
await sleep(2000);
res.sse({id: String(i), data: "Some message"});
}
});

fastify.get('/listenForChanges', {}, (request, reply) => {
const listenStream = fastify.db.watch('doc-uuid')
.on('data', (data)=>reply.sse({ data: JSON.stringify(data) }))
.on('delete', () => reply.sse({ event: 'close' })
request.socket.on('close', ()=>listenStream.end())
})
```
Without additional dependency ([not supported in all nodejs versions](https://nodejs.org/api/events.html#events_events_on_emitter_eventname_options):
##### Sending events from EventEmmiters
* [not supported in all nodejs versions](https://nodejs.org/api/events.html#events_events_on_emitter_eventname_options)
```javascript
import {FastifySSEPlugin} from "fastify-sse-v2";
import {on} from "events";
Expand All @@ -70,7 +73,7 @@ server.register(FastifySSEPlugin);
server.get("/", function (req, res) {
res.sse(
(async function* () {
for await (const event of on(eventEmmitter, "update")) {
for await (const [event] of on(eventEmmitter, "update")) {
yield {
type: event.name,
data: JSON.stringify(event),
Expand Down
8 changes: 4 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,17 @@
"eslint-plugin-import": "^2.18.2",
"eventsource": "^1.0.7",
"fastify": "3.18.1",
"it-pushable": "^1.4.0",
"mocha": "^8.3.2",
"sinon": "^9.0.2",
"ts-node": "^8.3.0",
"typescript": "^3.5.1"
},
"dependencies": {
"fastify-plugin": "2.0.1",
"it-to-stream": "^0.1.1"
"fastify-plugin": "2.3.4",
"it-to-stream": "^1.0.0",
"it-pushable": "^1.4.2"
},
"peerDependencies": {
"fastify": "3.x"
"fastify": ">=3"
}
}
4 changes: 3 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import fastifyPlugin from "fastify-plugin";
import {Pushable} from "it-pushable";
import {plugin} from "./plugin";

export const FastifySSEPlugin = fastifyPlugin(plugin, {
Expand Down Expand Up @@ -33,7 +34,8 @@ declare module "fastify" {
}

interface FastifyReply {
sse(source: AsyncIterable<EventMessage>): void;
sseContext: {source: Pushable<EventMessage>};
sse(source: AsyncIterable<EventMessage> | EventMessage): void;
}
}

Expand Down
38 changes: 28 additions & 10 deletions src/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,38 @@ import {EventMessage, FastifyPluginAsync, FastifyReply} from "fastify";
import {SsePluginOptions} from "./types";
import {serializeSSEEvent, transformAsyncIterable} from "./sse";
import toStream from "it-to-stream";
import {isAsyncIterable} from "./util";
import pushable from "it-pushable";

export const plugin: FastifyPluginAsync<SsePluginOptions> =
async function (instance, options): Promise<void> {
instance.decorateReply(
"sse",
function (this: FastifyReply, source: AsyncIterable<EventMessage>): void {
Object.entries(this.getHeaders()).forEach(([key, value]) => {
this.raw.setHeader(key, value);
});
this.raw.setHeader("Content-Type","text/event-stream");
this.raw.setHeader("Connection", "keep-alive");
this.raw.setHeader("Cache-Control", "no-cache,no-transform");
this.raw.setHeader("x-no-compression", 1);
this.raw.write(serializeSSEEvent({retry: options.retryDelay || 3000}));
toStream(transformAsyncIterable(source)).pipe(this.raw);
function (this: FastifyReply, source: AsyncIterable<EventMessage> | EventMessage): void {

//if this already set, it's not first event
if(!this.raw.headersSent) {
this.sseContext= {source: pushable<EventMessage>()};
Object.entries(this.getHeaders()).forEach(([key, value]) => {
this.raw.setHeader(key, value);
});
this.raw.setHeader("Content-Type","text/event-stream");
this.raw.setHeader("Connection", "keep-alive");
this.raw.setHeader("Cache-Control", "no-cache,no-transform");
this.raw.setHeader("x-no-compression", 1);
this.raw.write(serializeSSEEvent({retry: options.retryDelay || 3000}));
handleAsyncIterable(this, this.sseContext.source);
}
if(isAsyncIterable(source)) {
return handleAsyncIterable(this, source);
} else {
this.sseContext.source.push(source);
return;
}
});
};


function handleAsyncIterable(reply: FastifyReply, source: AsyncIterable<EventMessage>): void {
toStream(transformAsyncIterable(source)).pipe(reply.raw);
}
6 changes: 6 additions & 0 deletions src/util.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
export function isAsyncIterable<T extends AsyncIterable<unknown>>(source: T | unknown): source is T {
if (source === null || source === undefined || typeof source !== "object") {
return false;
}
return Symbol.asyncIterator in source;
}
27 changes: 25 additions & 2 deletions test/index.spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import {expect} from "chai";
import {FastifyInstance, EventMessage} from "fastify";
import {FastifyInstance, EventMessage, RouteHandler} from "fastify";
import {getEventSource, getFastifyServer, getBaseUrl} from "./utils";
import pushable, {Pushable} from "it-pushable";
import sinon from "sinon";
Expand Down Expand Up @@ -76,7 +76,7 @@ describe("Test SSE plugin", function () {

it("should send single event", function (done) {
const eventsource = getEventSource(server);
source.push({id: "1", event: "message", data: "Something"});
source.push({data: "Something", id: "1", event: "message"});
eventsource.onmessage = (evt => {
expect(evt.data).equal("Something");
expect(evt.type).equal("message");
Expand All @@ -87,6 +87,29 @@ describe("Test SSE plugin", function () {

});

it("should send multiple events without async iterable", function (done) {
const handler: RouteHandler = async (req, resp): Promise<void> => {
for await( const event of source) {
resp.sse(event);
return resp;
}

};
getFastifyServer(handler).then((server2) => {
const eventsource = getEventSource(server2);
source.push({id: "1", event: "message", data: "Something"});
eventsource.onmessage = (evt => {
expect(evt.data).equal("Something");
expect(evt.type).equal("message");
expect(evt.lastEventId).equal("1");
eventsource.close();
server2.close();
done();
});
});

});

it("should send multiple events", function (done) {
const eventsource = getEventSource(server);
source.push({id: "1", event: "message", data: "Something"});
Expand Down
Loading

0 comments on commit 538f67b

Please sign in to comment.