Skip to content

Commit

Permalink
Multiplex multiple virtual query streams through a single web socket (d…
Browse files Browse the repository at this point in the history
…igital-asset#10221)

* Initial work-in-progress

changelog_begin
[TS bindings] When using daml-react and daml-ledger, all streaming request to the query endpoint will be multiplexed through a single web socket.
changelog_end

* Minor touches, renaming

* Handle offsets received from the JSON API

* Create state array once per downstream consumer

* Handle reconnections as streamSubmit does

* Don't share mutable state between events

* Fix compilation errors

* Remove language-support/ts/package-lock.json

* Remove --downlevelIteration option, materialize iterators, fix one test

* I will refrain from commenting on the meaning of `this` in JavaScript

* WebSocket does not have a removeAllListeners method

* Remove unnecessary docstring from `handleQueriesChange`

* Address digital-asset#10221 (comment)

* Address digital-asset#10221 (comment)

* Address digital-asset#10221 (comment)

* Address digital-asset#10221 (comment)

* Fix some failing tests and linting fixes

* fix reconnect on server close test

* fix failing tests in test.ts and create-daml-app tests

* Update language-support/ts/daml-ledger/index.ts

Co-authored-by: Stefano Baghino <43749967+stefanobaghino-da@users.noreply.github.com>

* Update language-support/ts/daml-ledger/index.ts

Co-authored-by: Stefano Baghino <43749967+stefanobaghino-da@users.noreply.github.com>

* Update language-support/ts/daml-ledger/index.test.ts

Co-authored-by: Stefano Baghino <43749967+stefanobaghino-da@users.noreply.github.com>

* Update language-support/ts/daml-ledger/index.test.ts

Co-authored-by: Stefano Baghino <43749967+stefanobaghino-da@users.noreply.github.com>

* changes based on code review comments

Co-authored-by: Akshay <akshay.shirahatti@digitalasset.com>
Co-authored-by: akshayshirahatti-da <86774832+akshayshirahatti-da@users.noreply.github.com>
  • Loading branch information
3 people authored Oct 29, 2021
1 parent 3098b70 commit 05505e3
Show file tree
Hide file tree
Showing 4 changed files with 448 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ beforeAll(async () => {
'--port-file', SANDBOX_PORT_FILE,
'--ledgerid', LEDGER_ID,
'--wall-clock-time',
'--log-level=INFO',
darPath
],
);
Expand All @@ -76,7 +77,7 @@ beforeAll(async () => {
getEnv('JSON_API'),
['--ledger-host', 'localhost', '--ledger-port', `${sandboxPort}`,
'--port-file', JSON_API_PORT_FILE, '--http-port', "0",
'--allow-insecure-tokens', '--websocket-config', 'heartBeatPer=1'],
'--allow-insecure-tokens', '--websocket-config', '--heartBeatPer=1', '--log-level=INFO'],
['-Dakka.http.server.request-timeout=60s'],
)
await waitOn({resources: [`file:${JSON_API_PORT_FILE}`]})
Expand Down Expand Up @@ -165,7 +166,7 @@ test('create + fetch & exercise', async () => {
const alice5Contract = await aliceLedger.create(buildAndLint.Main.Person, alice5);
expect(alice5Contract.payload).toEqual(alice5);
expect(alice5Contract.key).toEqual(alice5Key);
expect(await aliceStream.next()).toEqual([[alice5Contract], [{created: alice5Contract}]]);
expect(await aliceStream.next()).toEqual([[alice5Contract], [{created: alice5Contract, matchedQueries: [0]}]]);

let personContracts = await aliceLedger.query(buildAndLint.Main.Person);
expect(personContracts).toEqual([alice5Contract]);
Expand Down Expand Up @@ -198,7 +199,7 @@ test('create + fetch & exercise', async () => {
expect(alice6Contract.contractId).toEqual(result);
expect(alice6Contract.payload).toEqual({...alice5, age: '6'});
expect(alice6Contract.key).toEqual({...alice5Key, _2: '6'});
expect(await aliceStream.next()).toEqual([[alice6Contract], [{archived: alice5Archived}, {created: alice6Contract}]]);
expect(await aliceStream.next()).toEqual([[alice6Contract], [{archived: alice5Archived}, {created: alice6Contract, matchedQueries:[0]}]]);

alice5ContractById = await aliceLedger.fetch(buildAndLint.Main.Person, alice5Contract.contractId);
expect(alice5ContractById).toBeNull();
Expand All @@ -215,7 +216,7 @@ test('create + fetch & exercise', async () => {
const personRawStream = aliceLedger.streamQuery(buildAndLint.Main.Person);
const personStream = promisifyStream(personRawStream);
const personStreamLive = pEvent(personRawStream, 'live');
expect(await personStream.next()).toEqual([[alice6Contract], [{created: alice6Contract}]]);
expect(await personStream.next()).toEqual([[alice6Contract], [{created: alice6Contract, matchedQueries:[1]}]]);

// end of non-live data, first offset
expect(await personStreamLive).toEqual([alice6Contract]);
Expand All @@ -225,7 +226,7 @@ test('create + fetch & exercise', async () => {
const bob4Contract = await bobLedger.create(buildAndLint.Main.Person, bob4);
expect(bob4Contract.payload).toEqual(bob4);
expect(bob4Contract.key).toEqual(bob4Key);
expect(await personStream.next()).toEqual([[alice6Contract, bob4Contract], [{created: bob4Contract}]]);
expect(await personStream.next()).toEqual([[alice6Contract, bob4Contract], [{created: bob4Contract, matchedQueries:[1]}]]);


// Alice changes her name.
Expand All @@ -240,9 +241,9 @@ test('create + fetch & exercise', async () => {
expect(cooper6Contract.contractId).toEqual(result);
expect(cooper6Contract.payload).toEqual({...alice5, name: 'Alice Cooper', age: '6'});
expect(cooper6Contract.key).toEqual(alice6Key);
expect(await aliceStream.next()).toEqual([[cooper6Contract], [{archived: alice6Archived}, {created: cooper6Contract}]]);
expect(await aliceStream.next()).toEqual([[cooper6Contract], [{archived: alice6Archived}, {created: cooper6Contract, matchedQueries:[0]}]]);
expect(await alice6KeyStream.next()).toEqual([cooper6Contract, [{archived: alice6Archived}, {created: cooper6Contract}]]);
expect(await personStream.next()).toEqual([[bob4Contract, cooper6Contract], [{archived: alice6Archived}, {created: cooper6Contract}]]);
expect(await personStream.next()).toEqual([[bob4Contract, cooper6Contract], [{archived: alice6Archived}, {created: cooper6Contract, matchedQueries:[1]}]]);

personContracts = await aliceLedger.query(buildAndLint.Main.Person);
expect(personContracts).toEqual([bob4Contract, cooper6Contract]);
Expand Down
50 changes: 41 additions & 9 deletions language-support/ts/daml-ledger/index.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

import { Template, Choice, ContractId } from "@daml/types";
import { Template, Choice, ContractId, registerTemplate } from "@daml/types";
import Ledger, {CreateEvent} from "./index";
import {assert} from "./index";
import { Event } from "./index";
Expand Down Expand Up @@ -35,13 +35,19 @@ let mockInstance = undefined as unknown as MockWebSocket;

jest.mock('isomorphic-ws', () => class {
private eventEmitter: EventEmitter;
//to represent readyState constants https://github.com/websockets/ws/blob/master/doc/ws.md#ready-state-constants
public readyState: number;

constructor(...args: unknown[]) {
mockConstructor(...args);
mockInstance = this;
// eslint-disable-next-line @typescript-eslint/no-var-requires
const {EventEmitter} = require('events');
this.eventEmitter = new EventEmitter();

// eslint-disable-next-line @typescript-eslint/no-var-requires
const {WsState} = require('./index');
this.readyState = WsState.Connecting;
}

addEventListener(event: string, handler: (...args: unknown[]) => void): void {
Expand All @@ -53,11 +59,17 @@ jest.mock('isomorphic-ws', () => class {
}

close(): void {
// eslint-disable-next-line @typescript-eslint/no-var-requires
const {WsState} = require('./index');
this.readyState = WsState.Closed;
mockClose();
}

serverOpen(): void {
this.eventEmitter.emit('open');
// eslint-disable-next-line @typescript-eslint/no-var-requires
const {WsState} = require('./index');
this.readyState = WsState.Open;
}

serverSend(message: Message): void {
Expand All @@ -66,11 +78,15 @@ jest.mock('isomorphic-ws', () => class {

serverClose(event: {code: number; reason: string}): void {
this.eventEmitter.emit('close', event);
// eslint-disable-next-line @typescript-eslint/no-var-requires
const {WsState} = require('./index');
this.readyState = WsState.Closing;
}
});




const Foo: Template<Foo, string, "foo-id"> = {
sdkVersion: '0.0.0-SDKVERSION',
templateId: "foo-id",
Expand All @@ -97,7 +113,7 @@ const fooCreateEvent = (
};

const fooEvent = (coid: number): Event<Foo, string, "foo-id"> => {
return { created: fooCreateEvent(coid) };
return { created: fooCreateEvent(coid), matchedQueries: [0] };
};


Expand Down Expand Up @@ -152,6 +168,7 @@ describe("streamSubmit", () => {
const stream = ledger.streamQueries(Foo, []);
stream.on("change", mockChange);
const restoreConsole = mockConsole();
mockInstance.serverOpen();
mockInstance.serverSend({ warnings: ["oh oh"] });
expect(console.warn).toHaveBeenCalledWith("Ledger.streamQueries warnings", {"warnings": ["oh oh"]});
restoreConsole();
Expand All @@ -162,6 +179,7 @@ describe("streamSubmit", () => {
const stream = ledger.streamFetchByKey(Foo, fooKey);
stream.on("change", mockChange);
const restoreConsole = mockConsole();
mockInstance.serverOpen();
mockInstance.serverSend({ errors: ["not good!"] });
expect(console.error).toHaveBeenCalledWith("Ledger.streamFetchByKey errors", { errors: ["not good!"] });
restoreConsole();
Expand All @@ -172,31 +190,35 @@ describe("streamSubmit", () => {
const stream = ledger.streamQuery(Foo);
stream.on("live", mockLive);
stream.on("change", state => mockChange(state));
mockInstance.serverOpen();
mockInstance.serverSend({ events: [], offset: null });
expect(mockLive).toHaveBeenCalledTimes(1);
expect(mockLive).toHaveBeenLastCalledWith([]);
expect(mockChange).not.toHaveBeenCalled();
});

test("reconnect on server close", async () => {
test("reconnect on server close with appropriate offsets", async () => {
const reconnectThreshold = 200;
const ledger = new Ledger({...mockOptions, reconnectThreshold: reconnectThreshold});
const stream = ledger.streamQuery(Foo);
const stream = ledger.streamQuery(Foo, {"key": "1"});
stream.on("live", mockLive);
stream.on("close", mockClose);
mockInstance.serverSend({events: [], offset: '3'});
mockInstance.serverOpen();
mockInstance.serverSend({events: [], offset: "4"});
await new Promise(resolve => setTimeout(resolve, reconnectThreshold * 2));
mockConstructor.mockClear();
mockInstance.serverClose({code: 1, reason: 'test close'});
expect(mockConstructor).toHaveBeenCalled();
mockInstance.serverOpen();
expect(mockSend).toHaveBeenNthCalledWith(1, {offset: "3"});
expect(mockSend).toHaveBeenNthCalledWith(2, [{"templateIds": ["foo-id"]}]);
//first query with no offsets.
expect(mockSend).toHaveBeenNthCalledWith(1, [{"query": {"key": "1"}, "templateIds": ["foo-id"]}]);
//subsequent one on reconnection with offsets received
expect(mockSend).toHaveBeenNthCalledWith(2, [{"offset": "4", "query": {"key": "1"}, "templateIds": ["foo-id"]}]);
mockSend.mockClear();
mockConstructor.mockClear();

// check that the client doesn't try to reconnect again. it should only reconnect if it
// received an event confirming the stream is live again, i.e. {events: [], offset: '3'}
// received an event confirming the stream is live again, i.e. {events: [], offset: '4'}
mockInstance.serverClose({code: 1, reason: 'test close'});
expect(mockConstructor).not.toHaveBeenCalled();
});
Expand All @@ -220,11 +242,13 @@ describe("streamSubmit", () => {

test("stop listening to a stream", () => {
const ledger = new Ledger(mockOptions);
registerTemplate(Foo as unknown as Template<Foo, unknown, string>);
const stream = ledger.streamQuery(Foo);
const count1 = jest.fn();
const count2 = jest.fn();
stream.on("change", count1);
stream.on("change", count2);
mockInstance.serverOpen();
mockInstance.serverSend({ events: [1, 2, 3].map(fooEvent) });
expect(count1).toHaveBeenCalledTimes(1);
expect(count2).toHaveBeenCalledTimes(1);
Expand All @@ -241,6 +265,7 @@ describe("streamQuery", () => {
const stream = ledger.streamQuery(Foo);
stream.on("live", mockLive);
stream.on("change", state => mockChange(state));
mockInstance.serverOpen();
mockInstance.serverSend({ events: [fooEvent(1)], offset: '3' });
expect(mockLive).toHaveBeenCalledTimes(1);
expect(mockLive).toHaveBeenLastCalledWith([fooCreateEvent(1)]);
Expand All @@ -252,6 +277,7 @@ describe("streamQuery", () => {
const ledger = new Ledger(mockOptions);
const stream = ledger.streamQuery(Foo);
stream.on("change", state => mockChange(state));
mockInstance.serverOpen();
mockInstance.serverSend({ events: [fooEvent(1)] });
expect(mockChange).toHaveBeenCalledTimes(1);
expect(mockChange).toHaveBeenLastCalledWith([fooCreateEvent(1)]);
Expand All @@ -261,6 +287,7 @@ describe("streamQuery", () => {
const ledger = new Ledger(mockOptions);
const stream = ledger.streamQuery(Foo);
stream.on("change", state => mockChange(state));
mockInstance.serverOpen();
mockInstance.serverSend({ events: [1, 2, 3].map(fooEvent) });
expect(mockChange).toHaveBeenCalledTimes(1);
expect(mockChange).toHaveBeenCalledWith([1, 2, 3].map(cid => fooCreateEvent(cid)));
Expand All @@ -270,6 +297,7 @@ describe("streamQuery", () => {
const ledger = new Ledger(mockOptions);
const stream = ledger.streamQuery(Foo);
stream.on("change", state => mockChange(state));
mockInstance.serverOpen();
mockInstance.serverSend({ events: [fooEvent(1), fooEvent(2)] });
expect(mockChange).toHaveBeenCalledTimes(1);
expect(mockChange).toHaveBeenCalledWith([fooCreateEvent(1), fooCreateEvent(2)]);
Expand All @@ -286,6 +314,7 @@ describe("streamQueries", () => {
const stream = ledger.streamQueries(Foo, []);
stream.on("live", mockLive);
stream.on("change", state => mockChange(state));
mockInstance.serverOpen();
mockInstance.serverSend({ events: [fooEvent(1)], offset: '3' });
expect(mockLive).toHaveBeenCalledTimes(1);
expect(mockLive).toHaveBeenLastCalledWith([fooCreateEvent(1)]);
Expand All @@ -297,6 +326,7 @@ describe("streamQueries", () => {
const ledger = new Ledger(mockOptions);
const stream = ledger.streamQueries(Foo, []);
stream.on("change", state => mockChange(state));
mockInstance.serverOpen();
mockInstance.serverSend({ events: [fooEvent(1)] });
expect(mockChange).toHaveBeenCalledTimes(1);
expect(mockChange).toHaveBeenLastCalledWith([fooCreateEvent(1)]);
Expand All @@ -306,6 +336,7 @@ describe("streamQueries", () => {
const ledger = new Ledger(mockOptions);
const stream = ledger.streamQueries(Foo, []);
stream.on("change", state => mockChange(state));
mockInstance.serverOpen();
mockInstance.serverSend({ events: [1, 2, 3].map(fooEvent) });
expect(mockChange).toHaveBeenCalledTimes(1);
expect(mockChange).toHaveBeenCalledWith([1, 2, 3].map(cid => fooCreateEvent(cid)));
Expand All @@ -315,6 +346,7 @@ describe("streamQueries", () => {
const ledger = new Ledger(mockOptions);
const stream = ledger.streamQueries(Foo, []);
stream.on("change", state => mockChange(state));
mockInstance.serverOpen();
mockInstance.serverSend({ events: [fooEvent(1), fooEvent(2)] });
expect(mockChange).toHaveBeenCalledTimes(1);
expect(mockChange).toHaveBeenCalledWith([fooCreateEvent(1), fooCreateEvent(2)]);
Expand Down Expand Up @@ -400,7 +432,7 @@ describe("streamFetchByKeys", () => {
});

test("watch multiple keys", () => {
const create = (cid: number, key: string): Event<Foo> => ({created: fooCreateEvent(cid, key)});
const create = (cid: number, key: string): Event<Foo> => ({created: fooCreateEvent(cid, key), matchedQueries: [0]});
const archive = fooArchiveEvent;
const send = (events: Event<Foo>[]): void => mockInstance.serverSend({events});
const expectCids = (expected: (number | null)[]): void => expect(mockChange) .toHaveBeenCalledWith( expected.map((cid: number | null, idx) => cid ? fooCreateEvent(cid, 'key' + (idx + 1)) : null));
Expand Down
Loading

0 comments on commit 05505e3

Please sign in to comment.