Skip to content

Commit

Permalink
feat(rstream): fix #8, support infinite StreamMerge's, update ctor
Browse files Browse the repository at this point in the history
BREAKING CHANGE: StreamMerge ctor now accepts an options object
only (`StreamMergeOpts`).
  • Loading branch information
postspectacular committed Feb 18, 2018
1 parent ca1caae commit 4942e2e
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 48 deletions.
18 changes: 10 additions & 8 deletions packages/rstream/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,13 @@ setTimeout(()=> raf.done(), 10000);
### Stream merging

```typescript
new rs.StreamMerge([
rs.fromEvent(document, "mousemove"),
rs.fromEvent(document, "mousedown"),
rs.fromEvent(document, "mouseup"),
])
new rs.StreamMerge({
src: [
rs.fromEvent(document, "mousemove"),
rs.fromEvent(document, "mousedown"),
rs.fromEvent(document, "mouseup"),
]
})
// add event transformer
.subscribe(tx.map((e) => [e.type, [e.clientX, e.clientY]]))
// add debug subscription
Expand All @@ -97,7 +99,7 @@ import * as atom from "@thi.ng/atom";
import * as tx from "@thi.ng/transducers";

// central app state / single source of truth
const app = new atom.Atom({ui: {theme: "dark", mode: false}, foo: "bar"});
const app = new atom.Atom({ ui: { theme: "dark", mode: false}, foo: "bar" });

// define some cursors for different UI params
const theme = new atom.Cursor(app, "ui.theme");
Expand Down Expand Up @@ -134,8 +136,8 @@ hist.redo(); // 1st
// theme: light
// { theme: 'light', mode: false }

// update another part of the app state (SPREAD, DON'T MUTATE!)
app.swap((state) => ({...state, session: {user: "asterix"}}));
// update another part of the app state (DON'T MUTATE!)
app.swap((state) => atom.setIn(state, "session.user", "asterix"));
// user: asterix
// { ui: { theme: 'light', mode: false },
// foo: 'bar',
Expand Down
65 changes: 35 additions & 30 deletions packages/rstream/src/stream-merge.ts
Original file line number Diff line number Diff line change
@@ -1,54 +1,53 @@
import { isFunction } from "@thi.ng/checks/is-function";
import { isString } from "@thi.ng/checks/is-string";
import { IID } from "@thi.ng/api/api";
import { Transducer } from "@thi.ng/transducers/api";

import { ISubscribable, State } from "./api";
import { Subscription } from "./subscription";

export interface StreamMergeOpts<A, B> extends IID<string> {
src: Iterable<ISubscribable<A>>;
xform: Transducer<A, B>;
close: boolean;
}

/**
* Subscription type consuming inputs from multiple inputs and passing
* received values on to any subscribers. Input streams can be added and
* removed dynamically. By default, the StreamMerge calls `done()` when
* the last active input is done, but this behavior can be overridden via
* the `close` constructor option (set to `false`).
*/
export class StreamMerge<A, B> extends Subscription<A, B> {

sources: ISubscribable<A>[];
wrappedSources: Subscription<A, any>[];
autoClose: boolean;

constructor(sources: Iterable<ISubscribable<A>>, id?: string);
constructor(xform: Transducer<A, B>, id?: string);
constructor(sources: Iterable<ISubscribable<A>>, xform: Transducer<A, B>, id?: string);
constructor(...args: any[]) {
let id = isString(args[args.length - 1]) ? args.pop() : `streammerge-${Subscription.NEXT_ID++}`;
let src, xform;
switch (args.length) {
case 2:
src = args[0];
xform = args[1];
break;
case 1:
if (isFunction(args[0])) {
xform = args[0];
} else {
src = args[0];
}
break;
default:
throw new Error(`illegal arity ${args.length}`);
}
super(null, xform, null, id);
constructor(opts?: Partial<StreamMergeOpts<A, B>>) {
opts = opts || {};
super(null, opts.xform, null, opts.id || `streammerge-${Subscription.NEXT_ID++}`);
this.sources = [];
this.wrappedSources = [];
if (src) {
for (let s of src) {
this.add(s);
}
this.autoClose = opts.close !== false;
if (opts.src) {
this.addAll(opts.src);
}
}

add(src: ISubscribable<A>) {
this.ensureState();
this.sources.push(src);
this.wrappedSources.push(
src.subscribe({
next: (x) => this.next(x),
done: () => this.markDone(src)
}));
this.sources.push(src);
}

addAll(src: Iterable<ISubscribable<A>>) {
for (let s of src) {
this.add(s);
}
}

remove(src: ISubscribable<A>) {
Expand All @@ -60,6 +59,12 @@ export class StreamMerge<A, B> extends Subscription<A, B> {
}
}

removeAll(src: Iterable<ISubscribable<A>>) {
for (let s of src) {
this.remove(s);
}
}

unsubscribe(sub?: Subscription<B, any>) {
if (!sub) {
for (let s of this.wrappedSources) {
Expand All @@ -86,7 +91,7 @@ export class StreamMerge<A, B> extends Subscription<A, B> {

protected markDone(src: ISubscribable<A>) {
this.remove(src);
if (!this.sources.length) {
if (this.autoClose && !this.sources.length) {
this.done();
}
}
Expand Down
30 changes: 20 additions & 10 deletions packages/rstream/test/stream-merge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ describe("StreamMerge", () => {
};

beforeEach(() => {
src = new rs.StreamMerge<number, number>([
rs.fromIterable([1, 2]),
rs.fromIterable([10, 20, 30, 40]),
rs.fromIterable([100, 200, 300])
]);
src = new rs.StreamMerge<number, number>({
src: [
rs.fromIterable([1, 2]),
rs.fromIterable([10, 20, 30, 40]),
rs.fromIterable([100, 200, 300])
]
});
});

it("merges all inputs", (done) => {
Expand All @@ -35,12 +37,20 @@ describe("StreamMerge", () => {
});

it("merges dynamic inputs", (done) => {
src = new rs.StreamMerge([]);
src = new rs.StreamMerge();
src.add(rs.fromIterable([1, 2, 3, 4], 10));
src.add(rs.fromIterable([10, 20], 5));
src.subscribe(check([1, 2, 3, 4, 10, 20], done));
});

it("merges dynamic inputs (synchronous)", (done) => {
src = new rs.StreamMerge({ close: false });
src.subscribe(check([1, 2, 3, 4, 10, 20], done));
src.add(rs.fromIterableSync([1, 2, 3, 4]));
src.add(rs.fromIterableSync([10, 20]));
src.done();
});

it("stops when no more subs", () => {
assert(src.getState() === rs.State.IDLE);
let sub1 = src.subscribe({});
Expand All @@ -52,13 +62,13 @@ describe("StreamMerge", () => {
});

it("applies transducer", (done) => {
src = new rs.StreamMerge<number, number>(
[
src = new rs.StreamMerge<number, number>({
src: [
rs.fromIterable([1, 2]),
rs.fromIterable([10, 20])
],
tx.mapcat((x: number) => [x, x + 1])
);
xform: tx.mapcat((x: number) => [x, x + 1])
});
src.subscribe(check([1, 2, 2, 3, 10, 11, 20, 21], done));
});

Expand Down

0 comments on commit 4942e2e

Please sign in to comment.