diff --git a/packages/rstream/src/object.ts b/packages/rstream/src/object.ts index a899e33acc..f6d47b83e3 100644 --- a/packages/rstream/src/object.ts +++ b/packages/rstream/src/object.ts @@ -1,5 +1,6 @@ import type { IObjectOf, Keys, NumOrString, Predicate2 } from "@thi.ng/api"; import { dedupe } from "@thi.ng/transducers/dedupe"; +import { range } from "@thi.ng/transducers/range"; import type { CommonOpts, ISubscription, SubscriptionOpts } from "./api.js"; import { __optsWithID } from "./idgen.js"; import { Subscription, subscription } from "./subscription.js"; @@ -43,36 +44,26 @@ export interface StreamObjOpts> extends CommonOpts { /** * Takes an arbitrary object `src` and object of options (see - * {@link StreamObjOpts}). Creates a new object and for each selected - * key creates a new stream, optionally seeded with the key's value in - * `src`. Returns new object of streams. + * {@link StreamObjOpts}). Creates a new object and for each selected key + * creates a new subscription, optionally seeded with the key's value in `src`. + * Returns new {@link StreamObj}. * * @remarks - * The structure of the returned object is - * {@link StreamObj | as follows}: - * - * ```text - * { - * streams: { ... }, - * next(x): void; - * done(): void; - * } - * ``` - * - * All streams will be stored under `streams`. The `next()` and `done()` - * functions/methods allow the object itself to be used as subscriber - * for an upstream subscribable (see 2nd example below): - * - * - `next()` - takes a object of same type as `src` and feeds each - * key's new value into its respective stream. If the `defaults` - * option is given, `undefined` key values are replaced with their - * specified default. If `dedupe` is enabled (default) only changed - * values (as per `equiv` predicate option) will be propagated - * downstream. - * - `done()` - calls {@link ISubscriber.done} on all streams - * - * The optional `opts` arg is used to customize overall behavior of - * `fromObject` and specify shared options for *all* created streams. + * The options arg is used to customize overall behavior of `fromObject` and + * specify shared options for *all* created streams. + * + * A {@link StreamObj} is a full {@link Subscription}, in which additionally all + * configured key streams are exposed under `streams`. The + * {@link StreamObj.next} and {@link StreamObj.done} methods allow the + * {@link StreamObj} itself to be used as subscriber for an upstream + * subscribable (see 2nd example below): + * + * {@link StreamObj.next} receives an object of same type as `src` and feeds + * each key's new value into its respective {@link StreamObj.streams}. If the + * {@link StreamObjOpts.defaults} option is given, `undefined` key values are + * replaced with their specified default. If {@link StreamObjOpts.dedupe} is + * enabled (default) only changed values (as per {@link StreamObjOpts.equiv} + * predicate option) will be propagated downstream. * * @example * ```ts tangle:../export/from-object.ts @@ -104,7 +95,7 @@ export interface StreamObjOpts> extends CommonOpts { * obj.streams.b.subscribe(trace("b")); * * const src = subscription(); - * // use as subscriber + * // use `obj` as subscriber itself * src.subscribe(obj); * * src.next({ a: 1, b: "foo" }); @@ -120,6 +111,45 @@ export const fromObject = >( opts: Partial> = {} ) => new StreamObj(src, opts); +/** + * Syntax sugar for {@link fromObject} for tuple/arrays. Returns a + * {@link StreamObj} which provides individual subscriptions for each tuple + * element, i.e. for 1:N fanout. + * + * @remarks + * This construct is very useful for UI purposes, helping to provide both + * finegrained and tuple-based reactive state for UI components used to edit + * tuple/vector values (e.g. via individual per-tuple-element input + * fields/controls). + * + * @example + * ```ts tangle:../export/from-tuple.ts + * import { fromTuple, subscription, trace } from "@thi.ng/rstream"; + * + * const tup = fromTuple([10, 20, 30]); + * + * tup.streams[0].subscribe(trace("[0]:")); + * tup.streams[1].subscribe(trace("[1]:")); + * tup.streams[2].subscribe(trace("[2]:")); + * + * // [0]: 10 + * // [1]: 20 + * // [2]: 30 + * + * tup.next([100,20,30]); + * + * // [0]: 100 + * // (the two other streams didn't update since their values haven't changed) + * ``` + * + * @param src + * @param opts + */ +export const fromTuple = ( + src: T[], + opts?: Partial> +) => new StreamObj(src, { keys: [...range(src.length)], ...opts }); + /** * See {@link fromObject} for details. */ @@ -155,8 +185,13 @@ export class StreamObj< } /** - * Feeds new values from `x` to each registered key's stream. - * Satifies {@link ISubscriber.next} interface. + * Receives an object of configured type and feeds each key's new value into + * its respective {@link StreamObj.streams}. If the + * {@link StreamObjOpts.defaults} option is given, `undefined` key values + * are replaced with their specified default. If + * {@link StreamObjOpts.dedupe} is enabled (default) only changed values (as + * per {@link StreamObjOpts.equiv} predicate option) will be propagated + * downstream. * * @param x - */ @@ -168,15 +203,22 @@ export class StreamObj< this.defaults && val === undefined ? this.defaults[k] : val ); } + super.next(x); } - /** - * Calls {@link ISubscriber.done} for all streams created. Satifies - * {@link ISubscriber.done} interface. - */ done() { for (let k of this.keys) { this.streams[k].done(); } + super.done(); + } + + unsubscribe(sub?: ISubscription | undefined) { + if (!sub) { + for (let k of this.keys) { + this.streams[k].unsubscribe(); + } + } + return super.unsubscribe(sub); } } diff --git a/packages/rstream/test/object.test.ts b/packages/rstream/test/object.test.ts index 7a5af72b7a..ea6959bf1e 100644 --- a/packages/rstream/test/object.test.ts +++ b/packages/rstream/test/object.test.ts @@ -1,5 +1,5 @@ import { expect, test } from "bun:test"; -import { Subscription, fromObject, stream } from "../src/index.js"; +import { Subscription, fromObject, fromTuple, stream } from "../src/index.js"; import { assertUnsub } from "./utils.js"; type Foo = { a?: number; b: string }; @@ -96,3 +96,35 @@ test("defaults & dedupe", () => { b: ["foo", "bar", "baz"], }); }); + +test("fromTuple", () => { + const acc = new Map(); + const collect = { + next(x: number) { + acc.set(x, (acc.get(x) ?? 0) + 1); + }, + }; + const tup = fromTuple([1, 2, 3]); + expect(new Set(Object.keys(tup.streams))).toEqual(new Set(["0", "1", "2"])); + tup.streams[0].subscribe(collect); + tup.streams[1].subscribe(collect); + tup.streams[2].subscribe(collect); + expect(acc).toEqual( + new Map([ + [1, 1], + [2, 1], + [3, 1], + ]) + ); + tup.next([1, 20, 3]); + tup.next([10, 2, 3]); + expect(acc).toEqual( + new Map([ + [1, 1], + [2, 2], + [3, 1], + [10, 1], + [20, 1], + ]) + ); +});