Skip to content

Commit

Permalink
feat(rstream): extend fromObject() features/opts
Browse files Browse the repository at this point in the history
- add support for default values, dedupe, equiv predicate
  • Loading branch information
postspectacular committed May 16, 2020
1 parent a17c27e commit 975f74c
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 31 deletions.
82 changes: 58 additions & 24 deletions packages/rstream/src/from/object.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
import { Keys } from "@thi.ng/api";
import type { Keys, Predicate2 } from "@thi.ng/api";
import { dedupe } from "@thi.ng/transducers";
import type { CommonOpts, SubscriptionOpts } from "../api";
import { Subscription, subscription } from "../subscription";
import { CommonOpts } from "../api";
import { optsWithID } from "../utils/idgen";
import { nextID } from "../utils/idgen";

export type KeyStreams<T, K extends Keys<T>> = {
[id in K]-?: Subscription<T[id], T[id]>;
};

/**
* Result object type for {@link fromObject}.
*/
export interface StreamObj<T, K extends Keys<T>> {
/**
* Object of managed streams for registered keys.
* Object of managed & typed streams for registered keys.
*/
streams: {
[id in K]-?: Subscription<T[id], T[id]>;
};
streams: KeyStreams<T, K>;
/**
* Feeds new values from `x` to each registered key's stream.
* Satifies {@link ISubscriber.next} interface.
Expand All @@ -27,20 +30,42 @@ export interface StreamObj<T, K extends Keys<T>> {
done(): void;
}

export interface StreamObjOpts extends CommonOpts {
export interface StreamObjOpts<T, K extends Keys<T>> extends CommonOpts {
/**
* Array of selected `keys` (else selects all by default) for which
* to create streams.
*/
keys: K[];
/**
* If true (default), all created streams will be seeded with key
* values from the source object.
*
* @defaultValue true
*/
initial: boolean;
/**
* Default values to use for `undefined` values of registered keys.
*/
defaults: Partial<T>;
/**
* If true, attaches {@link @thi.ng/transducers#dedupe} transducer
* to each key's value stream to avoid obsolete downstream
* propagation when a key's value hasn't actually changed.
*
* @defaultValue true
*/
dedupe: boolean;
/**
* Generic equality predicate to be used for `dedupe` (`===` by
* default). Ignored if `dedupe` option is false.
*/
equiv: Predicate2<any>;
}

/**
* Takes an arbitrary object `src` and optional array of `keys` (else
* selects all by default). Creates a new object and for each key
* creates a new stream, optionally seeded with the key's value in
* Takes an arbitrary object `src` and object of options (see
* {@link StreamObjOpts}). Creates a new object and for each selected
* key creates a new stream, optionally seeded with the key's value in
* `src`. Returns new object of streams.
*
* @remarks
Expand All @@ -55,18 +80,20 @@ export interface StreamObjOpts extends CommonOpts {
* }
* ```
*
* All streams (only for given `keys`) will be stored under `streams`.
* The `next()` and `done()` functions/methods allow the object itself
* to be used as subscriber for an upstream subscribable (see 2nd
* example):
* All streams will be stored under `streams`. The `next()` and `done()`
* functions/methods allow the object itself to be used as subscriber
* for an upstream subscribable (see 2nd example below):
*
* - `next()` - takes a object of same type as `src` and feeds each
* key's new value into its respective stream.
* key's new value into its respective stream. If the `defaults`
* option is given, `undefined` key values are replaced with their
* specified default. If `dedupe` is enabled (default) only changed
* values (as per `equiv` predicate option) will be propagated
* downstream.
* - `done()` - calls {@link ISubscriber.done} on all streams
*
* The optional `opts` arg is used to specify shared options for *all*
* created streams. If the `id` option isn't provided, each stream will
* get an autogenerated ID in the form `obj-${keyname}-${counter}`.
* The optional `opts` arg is used to customize overall behavior of
* `fromObject` and specify shared options for *all* created streams.
*
* @example
* ```ts
Expand Down Expand Up @@ -100,20 +127,27 @@ export interface StreamObjOpts extends CommonOpts {
* ```
*
* @param src
* @param keys
* @param opts
*/
export const fromObject = <T, K extends Keys<T>>(
src: T,
keys: K[] = <any>Object.keys(src),
opts: Partial<StreamObjOpts> = {}
opts: Partial<StreamObjOpts<T, K>> = {}
) => {
const id = opts.id || `obj${nextID()}`;
const keys = opts.keys || <K[]>Object.keys(src);
const _opts: Partial<SubscriptionOpts<any, any>> =
opts.dedupe !== false
? { xform: dedupe<any>(opts.equiv || ((a, b) => a === b)), ...opts }
: opts;
const streams: any = {};
const res = <StreamObj<T, K>>{
streams,
next(state) {
for (let k of keys) {
streams[k].next(<any>state[k]);
const val = state[k];
streams[k].next(
opts.defaults && val === undefined ? opts.defaults[k] : val
);
}
},
done() {
Expand All @@ -123,7 +157,7 @@ export const fromObject = <T, K extends Keys<T>>(
},
};
for (let k of keys) {
streams[k] = <any>subscription(undefined, optsWithID(`obj-${k}`, opts));
streams[k] = subscription(undefined, { ..._opts, id: `${id}-${k}` });
opts.initial !== false && streams[k].next(<any>src[k]);
}
return res;
Expand Down
47 changes: 40 additions & 7 deletions packages/rstream/test/object.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@ type Foo = { a?: number; b: string };

describe("fromObject", () => {
it("basic", () => {
const obj = fromObject(<{ a?: number; b: string }>{
a: 1,
b: "foo",
});
const obj = fromObject(
<{ a?: number; b: string }>{
a: 1,
b: "foo",
},
{ id: "test" }
);
assert(obj.streams.a instanceof Subscription);
assert(obj.streams.b instanceof Subscription);
assert(obj.streams.a.id.startsWith("obj-a-"));
assert(obj.streams.b.id.startsWith("obj-b-"));
assert(obj.streams.a.id.startsWith("test-a"));
assert(obj.streams.b.id.startsWith("test-b"));

const acc: any = { a: [], b: [] };
obj.streams.a.subscribe({
Expand All @@ -38,7 +41,7 @@ describe("fromObject", () => {

it("subscriber", () => {
const acc: any = { a: [], b: [] };
const obj = fromObject(<Foo>{}, ["a", "b"], { initial: false });
const obj = fromObject(<Foo>{}, { keys: ["a", "b"], initial: false });
obj.streams.a.subscribe({
next(x) {
acc.a.push(x);
Expand All @@ -63,4 +66,34 @@ describe("fromObject", () => {
assert.equal(obj.streams.a.getState(), State.DONE);
assert.equal(obj.streams.b.getState(), State.DONE);
});

it("defaults & dedupe", () => {
const acc: any = { a: [], b: [] };
const obj = fromObject(<Foo>{}, {
keys: ["a", "b"],
initial: false,
defaults: { a: 0 },
});
obj.streams.a.subscribe({
next(x) {
acc.a.push(x);
},
});
obj.streams.b.subscribe({
next(x) {
acc.b.push(x);
},
});

obj.next({ a: 1, b: "foo" });
obj.next({ b: "bar" });
obj.next({ a: 0, b: "bar" });
obj.next({ a: 2, b: "bar" });
obj.next({ a: 2, b: "baz" });
obj.next({ b: "baz" });
assert.deepEqual(acc, {
a: [1, 0, 2, 0],
b: ["foo", "bar", "baz"],
});
});
});

0 comments on commit 975f74c

Please sign in to comment.