From d938a44ccdd41406ef249d64e82c3f517c4907e8 Mon Sep 17 00:00:00 2001 From: akshayshirahatti-da <86774832+akshayshirahatti-da@users.noreply.github.com> Date: Thu, 11 Nov 2021 21:23:35 +0000 Subject: [PATCH] ws multiplexing is disabled by default as we investigate issues with intermittent web socket closures (#11657) CHANGELOG_BEGIN [TS-Bindings] Ws multiplexing for stream queries is disabled by default as we investigate issues of intermittent websocket closures. CHANGELOG_END --- .../build-and-lint-test/src/__tests__/test.ts | 6 ++-- language-support/ts/daml-ledger/index.test.ts | 30 +++++++++++++++++-- language-support/ts/daml-ledger/index.ts | 10 +++---- 3 files changed, 36 insertions(+), 10 deletions(-) diff --git a/language-support/ts/codegen/tests/ts/build-and-lint-test/src/__tests__/test.ts b/language-support/ts/codegen/tests/ts/build-and-lint-test/src/__tests__/test.ts index 6432dd89f388..938ce13ac3a1 100644 --- a/language-support/ts/codegen/tests/ts/build-and-lint-test/src/__tests__/test.ts +++ b/language-support/ts/codegen/tests/ts/build-and-lint-test/src/__tests__/test.ts @@ -216,7 +216,7 @@ test('create + fetch & exercise', async () => { const personRawStream = aliceLedger.streamQuery(buildAndLint.Main.Person); const personStream = promisifyStream(personRawStream); const personStreamLive = pEvent(personRawStream, 'live'); - expect(await personStream.next()).toEqual([[alice6Contract], [{created: alice6Contract, matchedQueries:[1]}]]); + expect(await personStream.next()).toEqual([[alice6Contract], [{created: alice6Contract, matchedQueries:[0]}]]); // end of non-live data, first offset expect(await personStreamLive).toEqual([alice6Contract]); @@ -226,7 +226,7 @@ test('create + fetch & exercise', async () => { const bob4Contract = await bobLedger.create(buildAndLint.Main.Person, bob4); expect(bob4Contract.payload).toEqual(bob4); expect(bob4Contract.key).toEqual(bob4Key); - expect(await personStream.next()).toEqual([[alice6Contract, bob4Contract], [{created: bob4Contract, matchedQueries:[1]}]]); + expect(await personStream.next()).toEqual([[alice6Contract, bob4Contract], [{created: bob4Contract, matchedQueries:[0]}]]); // Alice changes her name. @@ -243,7 +243,7 @@ test('create + fetch & exercise', async () => { expect(cooper6Contract.key).toEqual(alice6Key); expect(await aliceStream.next()).toEqual([[cooper6Contract], [{archived: alice6Archived}, {created: cooper6Contract, matchedQueries:[0]}]]); expect(await alice6KeyStream.next()).toEqual([cooper6Contract, [{archived: alice6Archived}, {created: cooper6Contract}]]); - expect(await personStream.next()).toEqual([[bob4Contract, cooper6Contract], [{archived: alice6Archived}, {created: cooper6Contract, matchedQueries:[1]}]]); + expect(await personStream.next()).toEqual([[bob4Contract, cooper6Contract], [{archived: alice6Archived}, {created: cooper6Contract, matchedQueries:[0]}]]); personContracts = await aliceLedger.query(buildAndLint.Main.Person); expect(personContracts).toEqual([bob4Contract, cooper6Contract]); diff --git a/language-support/ts/daml-ledger/index.test.ts b/language-support/ts/daml-ledger/index.test.ts index 56c85b6b0bbf..7c1a1ba03c44 100644 --- a/language-support/ts/daml-ledger/index.test.ts +++ b/language-support/ts/daml-ledger/index.test.ts @@ -197,9 +197,9 @@ describe("streamSubmit", () => { expect(mockChange).not.toHaveBeenCalled(); }); - test("reconnect on server close with appropriate offsets", async () => { + test("reconnect on server close with appropriate offsets, when ws multiplexing is enabled", async () => { const reconnectThreshold = 200; - const ledger = new Ledger({...mockOptions, reconnectThreshold: reconnectThreshold}); + const ledger = new Ledger({...mockOptions, reconnectThreshold: reconnectThreshold, multiplexQueryStreams: true}); const stream = ledger.streamQuery(Foo, {"key": "1"}); stream.on("live", mockLive); stream.on("close", mockClose); @@ -223,6 +223,32 @@ describe("streamSubmit", () => { expect(mockConstructor).not.toHaveBeenCalled(); }); + + test("reconnect on server close with appropriate offsets, when ws multiplexing is disabled", async () => { + const reconnectThreshold = 200; + const ledger = new Ledger({...mockOptions, reconnectThreshold: reconnectThreshold, multiplexQueryStreams: false}); + const stream = ledger.streamQuery(Foo); + stream.on("live", mockLive); + stream.on("close", mockClose); + mockInstance.serverOpen(); + mockInstance.serverSend({events: [], offset: "3"}); + await new Promise(resolve => setTimeout(resolve, reconnectThreshold * 2)); + mockConstructor.mockClear(); + mockInstance.serverClose({code: 1, reason: 'test close'}); + expect(mockConstructor).toHaveBeenCalled(); + mockInstance.serverOpen(); + expect(mockSend).toHaveBeenNthCalledWith(1, [{"templateIds": ["foo-id"]}]); //initial query + expect(mockSend).toHaveBeenNthCalledWith(2, {offset: "3"}); // offsets sent when reconnecting. + expect(mockSend).toHaveBeenNthCalledWith(3, [{"templateIds": ["foo-id"]}]); //reconnect query request. + mockSend.mockClear(); + mockConstructor.mockClear(); + + // check that the client doesn't try to reconnect again. it should only reconnect if it + // received an event confirming the stream is live again, i.e. {events: [], offset: '4'} + mockInstance.serverClose({code: 1, reason: 'test close'}); + expect(mockConstructor).not.toHaveBeenCalled(); + }); + test("do not reconnect on client close", () => { const ledger = new Ledger(mockOptions); const stream = ledger.streamQuery(Foo); diff --git a/language-support/ts/daml-ledger/index.ts b/language-support/ts/daml-ledger/index.ts index 8cb93ef9101b..98939a469ce2 100644 --- a/language-support/ts/daml-ledger/index.ts +++ b/language-support/ts/daml-ledger/index.ts @@ -703,7 +703,7 @@ class Ledger { /** * Construct a new `Ledger` object. See [[LedgerOptions]] for the constructor arguments. */ - constructor({token, httpBaseUrl, wsBaseUrl, reconnectThreshold = 30000, multiplexQueryStreams = true}: LedgerOptions) { + constructor({token, httpBaseUrl, wsBaseUrl, reconnectThreshold = 30000, multiplexQueryStreams = false}: LedgerOptions) { if (!httpBaseUrl) { httpBaseUrl = `${window.location.protocol}//${window.location.host}/`; } @@ -1054,11 +1054,11 @@ class Ledger { } } } else if (isRecordWith('warnings', json)) { - console.warn(`Ledger.${callerName} warnings`, json); + console.warn(`${callerName} warnings`, json); } else if (isRecordWith('errors', json)) { - console.error(`Ledger.${callerName} errors`, json); + console.error(`${callerName} errors`, json); } else { - console.error(`Ledger.${callerName} unknown message`, json); + console.error(`${callerName} unknown message`, json); } }; const closeStream = (status: { code: number; reason: string }): void => { @@ -1242,7 +1242,7 @@ class Ledger { lastContractId = contract ? contract.contractId : null return contract; } - return this.streamSubmit("streamFetchByKey", template, 'v1/stream/fetch', request, reconnectRequest, null, change); + return this.streamSubmit("Ledger.streamFetchByKey", template, 'v1/stream/fetch', request, reconnectRequest, null, change); } /**