Skip to content

Commit

Permalink
feat(server): Support returning multiple results from execute (enis…
Browse files Browse the repository at this point in the history
…denjo#28)

* feat(execution): support returning AsyncIterableIterator from execute

* docs: adjust protocol

* style: shorter wording

* refactor: clearer distinction

* refactor: execute can promise an iterator

* docs: differentiate `execute` and `subscribe`

Co-authored-by: Denis Badurina <denis@domonda.com>
  • Loading branch information
n1ru4l and enisdenjo authored Oct 19, 2020
1 parent 65e073d commit dbbd88b
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 83 deletions.
22 changes: 16 additions & 6 deletions PROTOCOL.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ Direction: **Client -> Server**

Requests an operation specified in the message `payload`. This message provides a unique ID field to connect future server messages to the operation started by this message.

If there is already an active subscriber for a `subscription` operation matching the provided ID, the server will close the socket immediately with the event `4409: Subscriber for <unique-operation-id> already exists`. Since `query` and `mutation` resolve to a single emitted value, their subscription does not require reservations for additional future events. Having this in mind, the server may not assert this rule for these operations.
If there is already an active subscriber for a live operation (any operation that emits **multiple** results) matching the provided ID, the server will close the socket immediately with the event `4409: Subscriber for <unique-operation-id> already exists`. Operations resolving to a **single** emitted result do not require reservations for additional future events - having this in mind, the server may not assert this rule for such cases.

```typescript
import { DocumentNode } from 'graphql';
Expand All @@ -97,10 +97,7 @@ Executing operations is allowed **only** after the server has acknowledged the c

Direction: **Server -> Client**

Operation execution result message.

- If the operation is a `query` or `mutation`, the message can be seen as the final execution result. This message is followed by the `Complete` message indicating the completion of the operation.
- If the operation is a `subscription`, the message can be seen as an event in the source stream requested by the `Subscribe` message.
Operation execution result(s) from the source stream created by the binding `Subscribe` message. After all results have been emitted, the `Complete` message will follow indicating stream completion.

```typescript
import { ExecutionResult } from 'graphql';
Expand Down Expand Up @@ -134,7 +131,7 @@ Direction: **bidirectional**

- **Server -> Client** indicates that the requested operation execution has completed. If the server dispatched the `Error` message relative to the original `Subscribe` message, no `Complete` message will be emitted.

- **Client -> Server** (for `subscription` operations only) indicating that the client has stopped listening to the events and wants to complete the source stream. No further data events, relevant to the original subscription, should be sent through.
- **Client -> Server** indicates that the client has stopped listening and wants to complete the source stream. No further events, relevant to the original subscription, should be sent through.

```typescript
interface CompleteMessage {
Expand Down Expand Up @@ -201,6 +198,19 @@ _The client and the server has already gone through [successful connection initi
1. _Server_ dispatches the `Complete` message with the matching unique ID indicating that the execution has completed
1. _Server_ triggers the `onComplete` callback, if specified

### Live Query operation

_The client and the server has already gone through [successful connection initialisation](#successful-connection-initialisation)._

1. _Client_ generates a unique ID for the following operation
1. _Client_ dispatches the `Subscribe` message with the, previously generated, unique ID through the `id` field and the requested `live query` operation passed through the `payload` field
1. _Server_ triggers the `onSubscribe` callback, if specified, and uses the returned `ExecutionArgs` for the operation
1. _Server_ validates the request, establishes a GraphQL subscription on the `live query` and listens for data events in the source stream
1. _Server_ dispatches `Next` messages for every data event in the underlying `live query` source stream matching the client's unique ID
1. _Client_ stops the `live query` by dispatching a `Complete` message with the matching unique ID
1. _Server_ effectively stops the GraphQL subscription by completing/disposing the underlying source stream and cleaning up related resources
1. _Server_ triggers the `onComplete` callback, if specified

### Subscribe operation

_The client and the server has already gone through [successful connection initialisation](#successful-connection-initialisation)._
Expand Down
144 changes: 67 additions & 77 deletions src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,17 +74,18 @@ export interface ServerOptions {
>;
};
/**
* Is the `subscribe` function
* from GraphQL which is used to
* execute the subscription operation
* upon.
* Is the `execute` function from GraphQL which is
* used to execute the query/mutation operation.
*/
execute: (args: ExecutionArgs) => Promise<ExecutionResult> | ExecutionResult;
execute: (
args: ExecutionArgs,
) =>
| Promise<AsyncIterableIterator<ExecutionResult> | ExecutionResult>
| AsyncIterableIterator<ExecutionResult>
| ExecutionResult;
/**
* Is the `subscribe` function
* from GraphQL which is used to
* execute the subscription operation
* upon.
* Is the `subscribe` function from GraphQL which is
* used to execute the subscription operation.
*/
subscribe: (
args: ExecutionArgs,
Expand Down Expand Up @@ -458,74 +459,43 @@ export function createServer(
}

// perform
let iterableOrResult;
if (operationAST.operation === 'subscription') {
const subscriptionOrResult = await subscribe(execArgs);
if (isAsyncIterable(subscriptionOrResult)) {
// iterable subscriptions are distinct on ID
if (ctx.subscriptions[message.id]) {
return ctx.socket.close(
4409,
`Subscriber for ${message.id} already exists`,
);
}
ctx.subscriptions[message.id] = subscriptionOrResult;

try {
for await (let result of subscriptionOrResult) {
// use the root formater first
if (formatExecutionResult) {
result = await formatExecutionResult(ctx, result);
}
// then use the subscription specific formatter
if (onSubscribeFormatter) {
result = await onSubscribeFormatter(ctx, result);
}
await sendMessage<MessageType.Next>(ctx, {
id: message.id,
type: MessageType.Next,
payload: result,
});
}
iterableOrResult = await subscribe(execArgs);
} else {
// operationAST.operation === 'query' || 'mutation'
iterableOrResult = await execute(execArgs);
}
if (isAsyncIterable(iterableOrResult)) {
/** multiple emitted results */

// iterable subscriptions are distinct on ID
if (ctx.subscriptions[message.id]) {
return ctx.socket.close(
4409,
`Subscriber for ${message.id} already exists`,
);
}
ctx.subscriptions[message.id] = iterableOrResult;

const completeMessage: CompleteMessage = {
id: message.id,
type: MessageType.Complete,
};
await sendMessage<MessageType.Complete>(ctx, completeMessage);
if (onComplete) {
onComplete(ctx, completeMessage);
try {
for await (let result of iterableOrResult) {
// use the root formater first
if (formatExecutionResult) {
result = await formatExecutionResult(ctx, result);
}
// then use the subscription specific formatter
if (onSubscribeFormatter) {
result = await onSubscribeFormatter(ctx, result);
}
} catch (err) {
await sendMessage<MessageType.Error>(ctx, {
// emit
await sendMessage<MessageType.Next>(ctx, {
id: message.id,
type: MessageType.Error,
payload: [
new GraphQLError(
err instanceof Error
? err.message
: new Error(err).message,
),
],
type: MessageType.Next,
payload: result,
});
} finally {
delete ctx.subscriptions[message.id];
}
} else {
let result = subscriptionOrResult;
// use the root formater first
if (formatExecutionResult) {
result = await formatExecutionResult(ctx, result);
}
// then use the subscription specific formatter
if (onSubscribeFormatter) {
result = await onSubscribeFormatter(ctx, result);
}
await sendMessage<MessageType.Next>(ctx, {
id: message.id,
type: MessageType.Next,
payload: result,
});

// source stream completed
const completeMessage: CompleteMessage = {
id: message.id,
type: MessageType.Complete,
Expand All @@ -534,25 +504,45 @@ export function createServer(
if (onComplete) {
onComplete(ctx, completeMessage);
}
} catch (err) {
await sendMessage<MessageType.Error>(ctx, {
id: message.id,
type: MessageType.Error,
payload: [
new GraphQLError(
err instanceof Error
? err.message
: new Error(err).message,
),
],
});
} finally {
delete ctx.subscriptions[message.id];
}
} else {
// operationAST.operation === 'query' || 'mutation'
/** single emitted result */

let result = await execute(execArgs);
// use the root formater first
if (formatExecutionResult) {
result = await formatExecutionResult(ctx, result);
iterableOrResult = await formatExecutionResult(
ctx,
iterableOrResult,
);
}
// then use the subscription specific formatter
if (onSubscribeFormatter) {
result = await onSubscribeFormatter(ctx, result);
iterableOrResult = await onSubscribeFormatter(
ctx,
iterableOrResult,
);
}
// emit
await sendMessage<MessageType.Next>(ctx, {
id: message.id,
type: MessageType.Next,
payload: result,
payload: iterableOrResult,
});

// resolved
const completeMessage: CompleteMessage = {
id: message.id,
type: MessageType.Complete,
Expand Down
81 changes: 81 additions & 0 deletions src/tests/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,87 @@ describe('Subscribe', () => {
await wait(20);
});

it('should execute the live query, "next" multiple results and then "complete"', async () => {
expect.assertions(5);

await makeServer({
schema,
execute: async function* () {
for (const value of ['Hi', 'Hello', 'Sup']) {
yield {
data: {
getValue: value,
},
};
}
},
});

const client = new WebSocket(url, GRAPHQL_TRANSPORT_WS_PROTOCOL);
client.onopen = () => {
client.send(
stringifyMessage<MessageType.ConnectionInit>({
type: MessageType.ConnectionInit,
}),
);
};

let receivedNextCount = 0;
client.onmessage = ({ data }) => {
const message = parseMessage(data);
switch (message.type) {
case MessageType.ConnectionAck:
client.send(
stringifyMessage<MessageType.Subscribe>({
id: '1',
type: MessageType.Subscribe,
payload: {
operationName: 'TestString',
query: `query TestString {
getValue
}`,
variables: {},
},
}),
);
break;
case MessageType.Next:
receivedNextCount++;
if (receivedNextCount === 1) {
expect(message).toEqual({
id: '1',
type: MessageType.Next,
payload: { data: { getValue: 'Hi' } },
});
} else if (receivedNextCount === 2) {
expect(message).toEqual({
id: '1',
type: MessageType.Next,
payload: { data: { getValue: 'Hello' } },
});
} else if (receivedNextCount === 3) {
expect(message).toEqual({
id: '1',
type: MessageType.Next,
payload: { data: { getValue: 'Sup' } },
});
}
break;
case MessageType.Complete:
expect(receivedNextCount).toEqual(3);
expect(message).toEqual({
id: '1',
type: MessageType.Complete,
});
break;
default:
fail(`Not supposed to receive a message of type ${message.type}`);
}
};

await wait(20);
});

it('should execute the query of `DocumentNode` type, "next" the result and then "complete"', async () => {
expect.assertions(3);

Expand Down

0 comments on commit dbbd88b

Please sign in to comment.