From 2bc7bff2eff9331d3a52830d0275d47fc7c59e78 Mon Sep 17 00:00:00 2001 From: Karsten Schmidt Date: Sun, 8 Aug 2021 11:25:31 +0200 Subject: [PATCH] fix(rstream): fix #305, metaStream() factory arg type - add test case --- packages/rstream/src/metastream.ts | 43 ++++++++++++++--------------- packages/rstream/test/metastream.ts | 25 +++++++++++++++-- 2 files changed, 43 insertions(+), 25 deletions(-) diff --git a/packages/rstream/src/metastream.ts b/packages/rstream/src/metastream.ts index ea9c10bd32..3003f4ed98 100644 --- a/packages/rstream/src/metastream.ts +++ b/packages/rstream/src/metastream.ts @@ -1,4 +1,4 @@ -import { assert, Fn } from "@thi.ng/api"; +import { assert, Fn, Nullable } from "@thi.ng/api"; import { CloseMode, CommonOpts, ISubscription, State } from "./api"; import { Subscription } from "./subscription"; import { optsWithID } from "./utils/idgen"; @@ -15,30 +15,27 @@ export interface MetaStreamOpts extends CommonOpts { } /** - * Returns a {@link Subscription} which transforms each incoming value - * into a new {@link Stream}, subscribes to it (via an hidden / internal + * Returns a {@link Subscription} which transforms each incoming value into a + * new {@link ISubscription}, subscribes to it (via an hidden / internal * subscription) and then only passes values from that stream to its own * subscribers. * * @remarks - * If a new value is received, the metastream first unsubscribes from - * any still active stream, before creating and subscribing to the new - * stream. Hence this stream type is useful for cases where streams need - * to be dynamically created & inserted into an existing dataflow - * topology. + * If a new value is received, the metastream first unsubscribes from any still + * active previous stream (if any), before creating and subscribing to the new + * one. Hence this stream type is useful for cases where streams need to be + * dynamically created & inserted into an existing dataflow topology. * - * The user supplied `factory` function will be called for each incoming - * value and is responsible for creating the new stream instances. If - * the function returns null/undefined, no further action will be taken - * (acts like a filter transducer). + * The user supplied `factory` function will be called for each incoming value + * and is responsible for creating the new stream instances. If the function + * returns null/undefined, no further action will be taken (acts like a filter + * transducer). * - * The factory function does NOT need to create *new* streams, but can - * merely return other existing streams, and so making the meta stream - * act like a switch with arbitrary criteria. - * - * If the meta stream itself is the only subscriber to existing input - * streams, you'll need to configure the input's - * {@link CommonOpts.closeOut} option to keep them alive and support + * The factory function does NOT need to create *new* streams, but can merely + * return other existing streams, and so making the meta stream act like a + * switch with arbitrary criteria. However, if the meta stream itself is the + * only subscriber to such existing input streams, you'll need to configure the + * input's {@link CommonOpts.closeOut} option to keep them alive and support * dynamic switching between them. * * @example @@ -97,7 +94,7 @@ export interface MetaStreamOpts extends CommonOpts { * @param id - */ export const metaStream = ( - factory: Fn>, + factory: Fn>>, opts?: Partial ) => new MetaStream(factory, opts); @@ -105,14 +102,14 @@ export const metaStream = ( * @see {@link metaStream} for reference & examples. */ export class MetaStream extends Subscription { - factory: Fn>; - stream?: Subscription; + factory: Fn>>; + stream?: ISubscription; sub?: ISubscription; emitLast: boolean; doneRequested: boolean; constructor( - factory: Fn>, + factory: Fn>>, opts: Partial = {} ) { super(undefined, optsWithID("metastram", opts)); diff --git a/packages/rstream/test/metastream.ts b/packages/rstream/test/metastream.ts index 5303683087..6ee6ad7c99 100644 --- a/packages/rstream/test/metastream.ts +++ b/packages/rstream/test/metastream.ts @@ -1,5 +1,5 @@ import * as assert from "assert"; -import { CloseMode, fromIterable, metaStream } from "../src"; +import { CloseMode, fromIterable, metaStream, reactive } from "../src"; import { TIMEOUT } from "./config"; import { assertActive, assertUnsub } from "./utils"; @@ -7,12 +7,12 @@ describe("MetaStream", function () { this.retries(3); it("basic", (done) => { + const acc: number[] = []; const src = fromIterable([1, 2, 3], { delay: TIMEOUT }); const meta = metaStream((x) => fromIterable([x * 10, x * 20, x * 30], { delay: TIMEOUT >> 2 }) ); const sub = src.subscribe(meta); - const acc: number[] = []; const sub2 = sub.subscribe({ next(x) { acc.push(x); @@ -27,6 +27,27 @@ describe("MetaStream", function () { }, 5 * TIMEOUT); }); + it("null", (done) => { + const acc: number[] = []; + const src = fromIterable([1, 2, 3], { delay: TIMEOUT }); + const meta = metaStream((x) => + x & 1 ? reactive(x) : null + ); + const sub = src.subscribe(meta); + const sub2 = sub.subscribe({ + next(x) { + acc.push(x); + }, + }); + setTimeout(() => { + assert.deepStrictEqual(acc, [1, 3]); + assertUnsub(meta); + assertUnsub(sub); + assertUnsub(sub2); + done(); + }, 5 * TIMEOUT); + }); + it("closein", (done) => { const src = fromIterable([1], { delay: TIMEOUT }); const meta = metaStream((x) => fromIterable([x]), {