Skip to content

Commit

Permalink
feat(rstream): add fromObject(), add docs & tests
Browse files Browse the repository at this point in the history
  • Loading branch information
postspectacular committed May 15, 2020
1 parent 97c3013 commit 5e854eb
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 0 deletions.
74 changes: 74 additions & 0 deletions packages/rstream/src/from/object.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import { Keys } from "@thi.ng/api";
import { Subscription, subscription } from "../subscription";
import { CommonOpts } from "../api";
import { optsWithID } from "../utils/idgen";

export type StreamObj<T, K extends Keys<T>> = {
[id in K]-?: Subscription<T[id], T[id]>;
} & {
__next(x: T): void;
__done(): void;
};

/**
* Takes an arbitrary object `src` and optional array of `keys` (else
* selects all by default). Creates a new object and for each key
* creates a new stream, seeded with the key's value in `src`. Returns
* new object of streams.
*
* @remarks
* In addition to the given `keys`, the following special functions will
* be added to the result object:
*
* - `__next()` - takes a new object of same type as `src` and feeds new
* values for each key/prop into its respective stream.
* - `__done()` - calls {@link ISubscriber.done} on all streams
*
* The optional `opts` arg is used to specify shared options for *all*
* streams. If the `id` option isn't provided, each stream will get an
* autogenerated ID in the form `obj-${keyname}-${counter}`.
*
* @example
* ```ts
* type Foo = { a?: number; b: string; };
*
* const streams = fromObject(<Foo>{ a: 1, b: "foo" })
*
* streams.a.subscribe(trace("a"))
* // a 1
* streams.b.subscribe(trace("b"))
* // b foo
*
* streams.__next({ b: "bar" })
* // a undefined
* // b bar
* ```
*
*
* @param src
* @param keys
* @param opts
*/
export const fromObject = <T, K extends Keys<T>>(
src: T,
keys: K[] = <any>Object.keys(src),
opts?: Partial<CommonOpts>
) => {
const dest = <StreamObj<T, K>>{
__next(state) {
for (let k of keys) {
dest[k].next(<any>state[k]);
}
},
__done() {
for (let k of keys) {
dest[k].done();
}
},
};
for (let k of keys) {
dest[k] = <any>subscription(undefined, optsWithID(`obj-${k}`, opts));
dest[k].next(<any>src[k]);
}
return dest;
};
1 change: 1 addition & 0 deletions packages/rstream/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ export * from "./from/atom";
export * from "./from/event";
export * from "./from/interval";
export * from "./from/iterable";
export * from "./from/object";
export * from "./from/promise";
export * from "./from/promises";
export * from "./from/raf";
Expand Down
36 changes: 36 additions & 0 deletions packages/rstream/test/object.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import * as assert from "assert";
import { fromObject, State, Subscription } from "../src/index";

describe("fromObject", () => {
it("basic", () => {
const streams = fromObject(<{ a?: number; b: string }>{
a: 1,
b: "foo",
});
assert(streams.a instanceof Subscription);
assert(streams.b instanceof Subscription);
assert(streams.a.id.startsWith("obj-a-"));
assert(streams.b.id.startsWith("obj-b-"));

const acc: any = { a: [], b: [] };
streams.a.subscribe({
next(x) {
acc.a.push(x);
},
});
streams.b.subscribe({
next(x) {
acc.b.push(x);
},
});
streams.__next({ a: 2, b: "bar" });
streams.__next({ b: "baz" });
streams.__done();
assert.deepEqual(acc, {
a: [1, 2, undefined],
b: ["foo", "bar", "baz"],
});
assert.equal(streams.a.getState(), State.DONE);
assert.equal(streams.b.getState(), State.DONE);
});
});

0 comments on commit 5e854eb

Please sign in to comment.