From 688f3509df09017f60b43fa16cda7bbe5dbf89fc Mon Sep 17 00:00:00 2001 From: Laurin Quast Date: Sat, 17 Oct 2020 11:49:23 +0200 Subject: [PATCH 1/6] feat(execution): support returning AsyncIterableIterator from execute --- src/server.ts | 154 ++++++++++++++++++++++++-------------------- src/tests/server.ts | 81 +++++++++++++++++++++++ 2 files changed, 166 insertions(+), 69 deletions(-) diff --git a/src/server.ts b/src/server.ts index 18546ead..e1d4bc3b 100644 --- a/src/server.ts +++ b/src/server.ts @@ -79,7 +79,12 @@ export interface ServerOptions { * execute the subscription operation * upon. */ - execute: (args: ExecutionArgs) => Promise | ExecutionResult; + execute: ( + args: ExecutionArgs, + ) => + | Promise + | ExecutionResult + | AsyncIterableIterator; /** * Is the `subscribe` function * from GraphQL which is used to @@ -457,59 +462,65 @@ export function createServer( }); } - // perform - if (operationAST.operation === 'subscription') { - const subscriptionOrResult = await subscribe(execArgs); - if (isAsyncIterable(subscriptionOrResult)) { - // iterable subscriptions are distinct on ID - if (ctx.subscriptions[message.id]) { - return ctx.socket.close( - 4409, - `Subscriber for ${message.id} already exists`, - ); - } - ctx.subscriptions[message.id] = subscriptionOrResult; - - try { - for await (let result of subscriptionOrResult) { - // use the root formater first - if (formatExecutionResult) { - result = await formatExecutionResult(ctx, result); - } - // then use the subscription specific formatter - if (onSubscribeFormatter) { - result = await onSubscribeFormatter(ctx, result); - } - await sendMessage(ctx, { - id: message.id, - type: MessageType.Next, - payload: result, - }); - } + const asyncIterableHandler = async ( + asyncIterable: AsyncIterableIterator, + ) => { + // iterable subscriptions are distinct on ID + if (ctx.subscriptions[message.id]) { + return ctx.socket.close( + 4409, + `Subscriber for ${message.id} already exists`, + ); + } + ctx.subscriptions[message.id] = asyncIterable; - const completeMessage: CompleteMessage = { - id: message.id, - type: MessageType.Complete, - }; - await sendMessage(ctx, completeMessage); - if (onComplete) { - onComplete(ctx, completeMessage); + try { + for await (let result of asyncIterable) { + // use the root formater first + if (formatExecutionResult) { + result = await formatExecutionResult(ctx, result); } - } catch (err) { - await sendMessage(ctx, { + // then use the subscription specific formatter + if (onSubscribeFormatter) { + result = await onSubscribeFormatter(ctx, result); + } + await sendMessage(ctx, { id: message.id, - type: MessageType.Error, - payload: [ - new GraphQLError( - err instanceof Error - ? err.message - : new Error(err).message, - ), - ], + type: MessageType.Next, + payload: result, }); - } finally { - delete ctx.subscriptions[message.id]; } + + const completeMessage: CompleteMessage = { + id: message.id, + type: MessageType.Complete, + }; + await sendMessage(ctx, completeMessage); + if (onComplete) { + onComplete(ctx, completeMessage); + } + } catch (err) { + await sendMessage(ctx, { + id: message.id, + type: MessageType.Error, + payload: [ + new GraphQLError( + err instanceof Error + ? err.message + : new Error(err).message, + ), + ], + }); + } finally { + delete ctx.subscriptions[message.id]; + } + }; + + // perform + if (operationAST.operation === 'subscription') { + const subscriptionOrResult = await subscribe(execArgs); + if (isAsyncIterable(subscriptionOrResult)) { + await asyncIterableHandler(subscriptionOrResult); } else { let result = subscriptionOrResult; // use the root formater first @@ -539,27 +550,32 @@ export function createServer( // operationAST.operation === 'query' || 'mutation' let result = await execute(execArgs); - // use the root formater first - if (formatExecutionResult) { - result = await formatExecutionResult(ctx, result); - } - // then use the subscription specific formatter - if (onSubscribeFormatter) { - result = await onSubscribeFormatter(ctx, result); - } - await sendMessage(ctx, { - id: message.id, - type: MessageType.Next, - payload: result, - }); - const completeMessage: CompleteMessage = { - id: message.id, - type: MessageType.Complete, - }; - await sendMessage(ctx, completeMessage); - if (onComplete) { - onComplete(ctx, completeMessage); + if (isAsyncIterable(result)) { + await asyncIterableHandler(result); + } else { + // use the root formater first + if (formatExecutionResult) { + result = await formatExecutionResult(ctx, result); + } + // then use the subscription specific formatter + if (onSubscribeFormatter) { + result = await onSubscribeFormatter(ctx, result); + } + await sendMessage(ctx, { + id: message.id, + type: MessageType.Next, + payload: result, + }); + + const completeMessage: CompleteMessage = { + id: message.id, + type: MessageType.Complete, + }; + await sendMessage(ctx, completeMessage); + if (onComplete) { + onComplete(ctx, completeMessage); + } } } break; diff --git a/src/tests/server.ts b/src/tests/server.ts index 6a4adf03..0cfb299b 100644 --- a/src/tests/server.ts +++ b/src/tests/server.ts @@ -559,6 +559,87 @@ describe('Subscribe', () => { await wait(20); }); + it('should execute a query operation with custom execute that returns a AsyncIterableIterator, "next" the results and then "complete"', async () => { + expect.assertions(5); + + await makeServer({ + schema, + execute: async function* () { + for (const value of ['Hi', 'Hello', 'Sup']) { + yield { + data: { + getValue: value, + }, + }; + } + }, + }); + + const client = new WebSocket(url, GRAPHQL_TRANSPORT_WS_PROTOCOL); + client.onopen = () => { + client.send( + stringifyMessage({ + type: MessageType.ConnectionInit, + }), + ); + }; + + let receivedNextCount = 0; + client.onmessage = ({ data }) => { + const message = parseMessage(data); + switch (message.type) { + case MessageType.ConnectionAck: + client.send( + stringifyMessage({ + id: '1', + type: MessageType.Subscribe, + payload: { + operationName: 'TestString', + query: `query TestString { + getValue + }`, + variables: {}, + }, + }), + ); + break; + case MessageType.Next: + receivedNextCount++; + if (receivedNextCount === 1) { + expect(message).toEqual({ + id: '1', + type: MessageType.Next, + payload: { data: { getValue: 'Hi' } }, + }); + } else if (receivedNextCount === 2) { + expect(message).toEqual({ + id: '1', + type: MessageType.Next, + payload: { data: { getValue: 'Hello' } }, + }); + } else if (receivedNextCount === 3) { + expect(message).toEqual({ + id: '1', + type: MessageType.Next, + payload: { data: { getValue: 'Sup' } }, + }); + } + break; + case MessageType.Complete: + expect(receivedNextCount).toEqual(3); + expect(message).toEqual({ + id: '1', + type: MessageType.Complete, + }); + break; + default: + fail(`Not supposed to receive a message of type ${message.type}`); + } + }; + + await wait(20); + }); + it('should execute the query of `DocumentNode` type, "next" the result and then "complete"', async () => { expect.assertions(3); From 1551ed2b447916631fb3a6bd931176c9f7f7502e Mon Sep 17 00:00:00 2001 From: Denis Badurina Date: Mon, 19 Oct 2020 12:54:46 +0200 Subject: [PATCH 2/6] docs: adjust protocol --- PROTOCOL.md | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/PROTOCOL.md b/PROTOCOL.md index c9bb7ffe..e7101c4a 100644 --- a/PROTOCOL.md +++ b/PROTOCOL.md @@ -75,7 +75,7 @@ Direction: **Client -> Server** Requests an operation specified in the message `payload`. This message provides a unique ID field to connect future server messages to the operation started by this message. -If there is already an active subscriber for a `subscription` operation matching the provided ID, the server will close the socket immediately with the event `4409: Subscriber for already exists`. Since `query` and `mutation` resolve to a single emitted value, their subscription does not require reservations for additional future events. Having this in mind, the server may not assert this rule for these operations. +If there is already an active subscriber for a live operation (any operation that emits **multiple** results) matching the provided ID, the server will close the socket immediately with the event `4409: Subscriber for already exists`. Operations resolving to a **single** emitted result do not require reservations for additional future events - having this in mind, the server may not assert this rule for such cases. ```typescript import { DocumentNode } from 'graphql'; @@ -97,10 +97,7 @@ Executing operations is allowed **only** after the server has acknowledged the c Direction: **Server -> Client** -Operation execution result message. - -- If the operation is a `query` or `mutation`, the message can be seen as the final execution result. This message is followed by the `Complete` message indicating the completion of the operation. -- If the operation is a `subscription`, the message can be seen as an event in the source stream requested by the `Subscribe` message. +Operation execution result(s) from the source stream created by the binding `Subscribe` message. After all results have been emitted, the `Complete` message will follow indicating stream completion. ```typescript import { ExecutionResult } from 'graphql'; @@ -134,7 +131,7 @@ Direction: **bidirectional** - **Server -> Client** indicates that the requested operation execution has completed. If the server dispatched the `Error` message relative to the original `Subscribe` message, no `Complete` message will be emitted. -- **Client -> Server** (for `subscription` operations only) indicating that the client has stopped listening to the events and wants to complete the source stream. No further data events, relevant to the original subscription, should be sent through. +- **Client -> Server** indicates that the client has stopped listening and wants to complete the source stream. No further events, relevant to the original subscription, should be sent through. ```typescript interface CompleteMessage { @@ -201,6 +198,19 @@ _The client and the server has already gone through [successful connection initi 1. _Server_ dispatches the `Complete` message with the matching unique ID indicating that the execution has completed 1. _Server_ triggers the `onComplete` callback, if specified +### Live Query operation + +_The client and the server has already gone through [successful connection initialisation](#successful-connection-initialisation)._ + +1. _Client_ generates a unique ID for the following operation +1. _Client_ dispatches the `Subscribe` message with the, previously generated, unique ID through the `id` field and the requested `live query` operation passed through the `payload` field +1. _Server_ triggers the `onSubscribe` callback, if specified, and uses the returned `ExecutionArgs` for the operation +1. _Server_ validates the request, establishes a GraphQL subscription on the `live query` and listens for data events in the source stream +1. _Server_ dispatches `Next` messages for every data event in the underlying `live query` source stream matching the client's unique ID +1. _Client_ stops the `live query` by dispatching a `Complete` message with the matching unique ID +1. _Server_ effectively stops the GraphQL subscription by completing/disposing the underlying source stream and cleaning up related resources +1. _Server_ triggers the `onComplete` callback, if specified + ### Subscribe operation _The client and the server has already gone through [successful connection initialisation](#successful-connection-initialisation)._ From b1f87810ae65619518c0b68a004284941228085f Mon Sep 17 00:00:00 2001 From: Denis Badurina Date: Mon, 19 Oct 2020 12:58:52 +0200 Subject: [PATCH 3/6] style: shorter wording --- src/tests/server.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tests/server.ts b/src/tests/server.ts index 0cfb299b..2af31af0 100644 --- a/src/tests/server.ts +++ b/src/tests/server.ts @@ -559,7 +559,7 @@ describe('Subscribe', () => { await wait(20); }); - it('should execute a query operation with custom execute that returns a AsyncIterableIterator, "next" the results and then "complete"', async () => { + it('should execute the live query, "next" multiple results and then "complete"', async () => { expect.assertions(5); await makeServer({ From a6e599801d9774821b345ae0fe20ca496fe7dae3 Mon Sep 17 00:00:00 2001 From: Denis Badurina Date: Mon, 19 Oct 2020 13:08:16 +0200 Subject: [PATCH 4/6] refactor: clearer distinction --- src/server.ts | 110 ++++++++++++++++++++------------------------------ 1 file changed, 44 insertions(+), 66 deletions(-) diff --git a/src/server.ts b/src/server.ts index e1d4bc3b..59eb3cf1 100644 --- a/src/server.ts +++ b/src/server.ts @@ -462,9 +462,17 @@ export function createServer( }); } - const asyncIterableHandler = async ( - asyncIterable: AsyncIterableIterator, - ) => { + // perform + let iterableOrResult; + if (operationAST.operation === 'subscription') { + iterableOrResult = await subscribe(execArgs); + } else { + // operationAST.operation === 'query' || 'mutation' + iterableOrResult = await execute(execArgs); + } + if (isAsyncIterable(iterableOrResult)) { + /** multiple emitted results */ + // iterable subscriptions are distinct on ID if (ctx.subscriptions[message.id]) { return ctx.socket.close( @@ -472,10 +480,10 @@ export function createServer( `Subscriber for ${message.id} already exists`, ); } - ctx.subscriptions[message.id] = asyncIterable; + ctx.subscriptions[message.id] = iterableOrResult; try { - for await (let result of asyncIterable) { + for await (let result of iterableOrResult) { // use the root formater first if (formatExecutionResult) { result = await formatExecutionResult(ctx, result); @@ -484,13 +492,14 @@ export function createServer( if (onSubscribeFormatter) { result = await onSubscribeFormatter(ctx, result); } + // emit await sendMessage(ctx, { id: message.id, type: MessageType.Next, payload: result, }); } - + // source stream completed const completeMessage: CompleteMessage = { id: message.id, type: MessageType.Complete, @@ -514,68 +523,37 @@ export function createServer( } finally { delete ctx.subscriptions[message.id]; } - }; - - // perform - if (operationAST.operation === 'subscription') { - const subscriptionOrResult = await subscribe(execArgs); - if (isAsyncIterable(subscriptionOrResult)) { - await asyncIterableHandler(subscriptionOrResult); - } else { - let result = subscriptionOrResult; - // use the root formater first - if (formatExecutionResult) { - result = await formatExecutionResult(ctx, result); - } - // then use the subscription specific formatter - if (onSubscribeFormatter) { - result = await onSubscribeFormatter(ctx, result); - } - await sendMessage(ctx, { - id: message.id, - type: MessageType.Next, - payload: result, - }); - - const completeMessage: CompleteMessage = { - id: message.id, - type: MessageType.Complete, - }; - await sendMessage(ctx, completeMessage); - if (onComplete) { - onComplete(ctx, completeMessage); - } - } } else { - // operationAST.operation === 'query' || 'mutation' - - let result = await execute(execArgs); - - if (isAsyncIterable(result)) { - await asyncIterableHandler(result); - } else { - // use the root formater first - if (formatExecutionResult) { - result = await formatExecutionResult(ctx, result); - } - // then use the subscription specific formatter - if (onSubscribeFormatter) { - result = await onSubscribeFormatter(ctx, result); - } - await sendMessage(ctx, { - id: message.id, - type: MessageType.Next, - payload: result, - }); + /** single emitted result */ - const completeMessage: CompleteMessage = { - id: message.id, - type: MessageType.Complete, - }; - await sendMessage(ctx, completeMessage); - if (onComplete) { - onComplete(ctx, completeMessage); - } + // use the root formater first + if (formatExecutionResult) { + iterableOrResult = await formatExecutionResult( + ctx, + iterableOrResult, + ); + } + // then use the subscription specific formatter + if (onSubscribeFormatter) { + iterableOrResult = await onSubscribeFormatter( + ctx, + iterableOrResult, + ); + } + // emit + await sendMessage(ctx, { + id: message.id, + type: MessageType.Next, + payload: iterableOrResult, + }); + // resolved + const completeMessage: CompleteMessage = { + id: message.id, + type: MessageType.Complete, + }; + await sendMessage(ctx, completeMessage); + if (onComplete) { + onComplete(ctx, completeMessage); } } break; From b7d42b95cabc1bacae066b735acae7b0dfc2eb80 Mon Sep 17 00:00:00 2001 From: Denis Badurina Date: Mon, 19 Oct 2020 13:11:07 +0200 Subject: [PATCH 5/6] refactor: execute can promise an iterator --- src/server.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/server.ts b/src/server.ts index 59eb3cf1..e03a29c2 100644 --- a/src/server.ts +++ b/src/server.ts @@ -82,9 +82,9 @@ export interface ServerOptions { execute: ( args: ExecutionArgs, ) => - | Promise - | ExecutionResult - | AsyncIterableIterator; + | Promise | ExecutionResult> + | AsyncIterableIterator + | ExecutionResult; /** * Is the `subscribe` function * from GraphQL which is used to From 99c9afaf8e397ced3bf58a2db37a86f9d48f5cfc Mon Sep 17 00:00:00 2001 From: Denis Badurina Date: Mon, 19 Oct 2020 13:12:45 +0200 Subject: [PATCH 6/6] docs: differentiate `execute` and `subscribe` --- src/server.ts | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/src/server.ts b/src/server.ts index e03a29c2..bbbcf88a 100644 --- a/src/server.ts +++ b/src/server.ts @@ -74,10 +74,8 @@ export interface ServerOptions { >; }; /** - * Is the `subscribe` function - * from GraphQL which is used to - * execute the subscription operation - * upon. + * Is the `execute` function from GraphQL which is + * used to execute the query/mutation operation. */ execute: ( args: ExecutionArgs, @@ -86,10 +84,8 @@ export interface ServerOptions { | AsyncIterableIterator | ExecutionResult; /** - * Is the `subscribe` function - * from GraphQL which is used to - * execute the subscription operation - * upon. + * Is the `subscribe` function from GraphQL which is + * used to execute the subscription operation. */ subscribe: ( args: ExecutionArgs,