Skip to content

Commit

Permalink
feat(rstream): add fromTuple(), update StreamObj impl
Browse files Browse the repository at this point in the history
- update docs
- add tests
  • Loading branch information
postspectacular committed Dec 5, 2024
1 parent 7f97d3a commit ef691cc
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 36 deletions.
112 changes: 77 additions & 35 deletions packages/rstream/src/object.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type { IObjectOf, Keys, NumOrString, Predicate2 } from "@thi.ng/api";
import { dedupe } from "@thi.ng/transducers/dedupe";
import { range } from "@thi.ng/transducers/range";
import type { CommonOpts, ISubscription, SubscriptionOpts } from "./api.js";
import { __optsWithID } from "./idgen.js";
import { Subscription, subscription } from "./subscription.js";
Expand Down Expand Up @@ -43,36 +44,26 @@ export interface StreamObjOpts<T, K extends Keys<T>> extends CommonOpts {

/**
* Takes an arbitrary object `src` and object of options (see
* {@link StreamObjOpts}). Creates a new object and for each selected
* key creates a new stream, optionally seeded with the key's value in
* `src`. Returns new object of streams.
* {@link StreamObjOpts}). Creates a new object and for each selected key
* creates a new subscription, optionally seeded with the key's value in `src`.
* Returns new {@link StreamObj}.
*
* @remarks
* The structure of the returned object is
* {@link StreamObj | as follows}:
*
* ```text
* {
* streams: { ... },
* next(x): void;
* done(): void;
* }
* ```
*
* All streams will be stored under `streams`. The `next()` and `done()`
* functions/methods allow the object itself to be used as subscriber
* for an upstream subscribable (see 2nd example below):
*
* - `next()` - takes a object of same type as `src` and feeds each
* key's new value into its respective stream. If the `defaults`
* option is given, `undefined` key values are replaced with their
* specified default. If `dedupe` is enabled (default) only changed
* values (as per `equiv` predicate option) will be propagated
* downstream.
* - `done()` - calls {@link ISubscriber.done} on all streams
*
* The optional `opts` arg is used to customize overall behavior of
* `fromObject` and specify shared options for *all* created streams.
* The options arg is used to customize overall behavior of `fromObject` and
* specify shared options for *all* created streams.
*
* A {@link StreamObj} is a full {@link Subscription}, in which additionally all
* configured key streams are exposed under `streams`. The
* {@link StreamObj.next} and {@link StreamObj.done} methods allow the
* {@link StreamObj} itself to be used as subscriber for an upstream
* subscribable (see 2nd example below):
*
* {@link StreamObj.next} receives an object of same type as `src` and feeds
* each key's new value into its respective {@link StreamObj.streams}. If the
* {@link StreamObjOpts.defaults} option is given, `undefined` key values are
* replaced with their specified default. If {@link StreamObjOpts.dedupe} is
* enabled (default) only changed values (as per {@link StreamObjOpts.equiv}
* predicate option) will be propagated downstream.
*
* @example
* ```ts tangle:../export/from-object.ts
Expand Down Expand Up @@ -104,7 +95,7 @@ export interface StreamObjOpts<T, K extends Keys<T>> extends CommonOpts {
* obj.streams.b.subscribe(trace("b"));
*
* const src = subscription<Foo, Foo>();
* // use as subscriber
* // use `obj` as subscriber itself
* src.subscribe(obj);
*
* src.next({ a: 1, b: "foo" });
Expand All @@ -120,6 +111,45 @@ export const fromObject = <T extends object, K extends Keys<T>>(
opts: Partial<StreamObjOpts<T, K>> = {}
) => new StreamObj<T, K>(src, opts);

/**
* Syntax sugar for {@link fromObject} for tuple/arrays. Returns a
* {@link StreamObj} which provides individual subscriptions for each tuple
* element, i.e. for 1:N fanout.
*
* @remarks
* This construct is very useful for UI purposes, helping to provide both
* finegrained and tuple-based reactive state for UI components used to edit
* tuple/vector values (e.g. via individual per-tuple-element input
* fields/controls).
*
* @example
* ```ts tangle:../export/from-tuple.ts
* import { fromTuple, subscription, trace } from "@thi.ng/rstream";
*
* const tup = fromTuple([10, 20, 30]);
*
* tup.streams[0].subscribe(trace("[0]:"));
* tup.streams[1].subscribe(trace("[1]:"));
* tup.streams[2].subscribe(trace("[2]:"));
*
* // [0]: 10
* // [1]: 20
* // [2]: 30
*
* tup.next([100,20,30]);
*
* // [0]: 100
* // (the two other streams didn't update since their values haven't changed)
* ```
*
* @param src
* @param opts
*/
export const fromTuple = <T>(
src: T[],
opts?: Partial<StreamObjOpts<T[], number>>
) => new StreamObj<T[], number>(src, { keys: [...range(src.length)], ...opts });

/**
* See {@link fromObject} for details.
*/
Expand Down Expand Up @@ -155,8 +185,13 @@ export class StreamObj<
}

/**
* Feeds new values from `x` to each registered key's stream.
* Satifies {@link ISubscriber.next} interface.
* Receives an object of configured type and feeds each key's new value into
* its respective {@link StreamObj.streams}. If the
* {@link StreamObjOpts.defaults} option is given, `undefined` key values
* are replaced with their specified default. If
* {@link StreamObjOpts.dedupe} is enabled (default) only changed values (as
* per {@link StreamObjOpts.equiv} predicate option) will be propagated
* downstream.
*
* @param x -
*/
Expand All @@ -168,15 +203,22 @@ export class StreamObj<
this.defaults && val === undefined ? this.defaults[<K>k] : val
);
}
super.next(x);
}

/**
* Calls {@link ISubscriber.done} for all streams created. Satifies
* {@link ISubscriber.done} interface.
*/
done() {
for (let k of this.keys) {
this.streams[k].done();
}
super.done();
}

unsubscribe(sub?: ISubscription<T, any> | undefined) {
if (!sub) {
for (let k of this.keys) {
this.streams[k].unsubscribe();
}
}
return super.unsubscribe(sub);
}
}
34 changes: 33 additions & 1 deletion packages/rstream/test/object.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { expect, test } from "bun:test";
import { Subscription, fromObject, stream } from "../src/index.js";
import { Subscription, fromObject, fromTuple, stream } from "../src/index.js";
import { assertUnsub } from "./utils.js";

type Foo = { a?: number; b: string };
Expand Down Expand Up @@ -96,3 +96,35 @@ test("defaults & dedupe", () => {
b: ["foo", "bar", "baz"],
});
});

test("fromTuple", () => {
const acc = new Map<number, number>();
const collect = {
next(x: number) {
acc.set(x, (acc.get(x) ?? 0) + 1);
},
};
const tup = fromTuple([1, 2, 3]);
expect(new Set(Object.keys(tup.streams))).toEqual(new Set(["0", "1", "2"]));
tup.streams[0].subscribe(collect);
tup.streams[1].subscribe(collect);
tup.streams[2].subscribe(collect);
expect(acc).toEqual(
new Map([
[1, 1],
[2, 1],
[3, 1],
])
);
tup.next([1, 20, 3]);
tup.next([10, 2, 3]);
expect(acc).toEqual(
new Map([
[1, 1],
[2, 2],
[3, 1],
[10, 1],
[20, 1],
])
);
});

0 comments on commit ef691cc

Please sign in to comment.