Skip to content

Commit

Permalink
feat(rstream): initial work on #74, #81, #91, #92, add stream opts
Browse files Browse the repository at this point in the history
- add CommonOpts, TransformableOpts, SubscriptionOpts
- update all ctors & factory fns to accepts options arg
- fix #81 (only keep last received value if `cache` option enabled)
- check from `closeOut` mode and possibly keep stream alive
  after all current subscribers have left (#74)
- update tests (but need to add various new ones)
- add optsWithID() helper to inject ID option if needed
  • Loading branch information
postspectacular committed Aug 17, 2019
1 parent 4b74159 commit e770469
Show file tree
Hide file tree
Showing 26 changed files with 223 additions and 193 deletions.
40 changes: 36 additions & 4 deletions packages/rstream/src/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,42 @@ export const enum CloseMode {
LAST
}

/**
* Reverse lookup for `State` enums
*/
// export const __State = (<any>exports).State;
export interface CommonOpts {
/**
* Internal ID associated with this stream. If omitted, an
* autogenerated ID will be used.
*/
id: string;
/**
* If false or `CloseMode.NEVER`, the stream stays active even if
* all inputs are done. If true (default) or `CloseMode.LAST`, the
* stream closes when the last input is done. If `CloseMode.FIRST`,
* the instance closes when the first input is done.
*/
closeIn: CloseMode;
/**
* If false or `CloseMode.NEVER`, the stream stays active once there
* are no more subscribers. If true (default) or `CloseMode.LAST`,
* the stream closes when the last subscriber has unsubscribed. If
* `CloseMode.FIRST`, the instance closes when the first subscriber
* disconnects.
*/
closeOut: CloseMode;
/**
* If true (default), stream caches last received value and pushes
* it to new subscriberswhen they subscribe. If false, calling
* `.deref()` on this stream will always return `undefined`.
*/
cache: boolean;
}

export interface TransformableOpts<A, B> extends CommonOpts {
xform: Transducer<A, B>;
}

export interface SubscriptionOpts<A, B> extends TransformableOpts<A, B> {
parent: ISubscribable<A>;
}

export interface ISubscriber<T> {
next: Fn<T, void>;
Expand Down
8 changes: 5 additions & 3 deletions packages/rstream/src/from/atom.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { Predicate2 } from "@thi.ng/api";
import { ReadonlyAtom } from "@thi.ng/atom";
import { CommonOpts } from "../api";
import { Stream } from "../stream";
import { nextID } from "../utils/idgen";
import { optsWithID } from "../utils/idgen";

/**
* Yields stream of value changes in given atom / cursor. Attaches watch
Expand Down Expand Up @@ -36,7 +37,8 @@ import { nextID } from "../utils/idgen";
export const fromAtom = <T>(
atom: ReadonlyAtom<T>,
emitFirst = true,
changed?: Predicate2<T>
changed?: Predicate2<T>,
opts?: Partial<CommonOpts>
): Stream<T> =>
new Stream<T>((stream) => {
changed = changed || ((a, b) => a !== b);
Expand All @@ -47,4 +49,4 @@ export const fromAtom = <T>(
});
emitFirst && stream.next(atom.deref());
return () => atom.removeWatch(stream.id);
}, `atom-${nextID()}`);
}, optsWithID("atom-", opts));
26 changes: 17 additions & 9 deletions packages/rstream/src/from/event.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { CommonOpts } from "../api";
import { Stream } from "../stream";
import { nextID } from "../utils/idgen";
import { optsWithID } from "../utils/idgen";

/**
* Creates a new stream of events attached to given element / event
Expand All @@ -8,18 +9,20 @@ import { nextID } from "../utils/idgen";
*
* @param src event target
* @param name event name
* @param opts listener opts
* @param listenerOpts listener opts
* @param streamOpts stream opts
*/
export const fromEvent = (
src: EventTarget,
name: string,
opts: boolean | AddEventListenerOptions = false
listenerOpts: boolean | AddEventListenerOptions = false,
streamOpts?: Partial<CommonOpts>
) =>
new Stream<Event>((stream) => {
let listener = (e: Event) => stream.next(e);
src.addEventListener(name, listener, opts);
return () => src.removeEventListener(name, listener, opts);
}, `event-${name}-${nextID()}`);
src.addEventListener(name, listener, listenerOpts);
return () => src.removeEventListener(name, listener, listenerOpts);
}, optsWithID(`event-${name}-`, streamOpts));

/**
* Same as `fromEvent`, however only supports well-known DOM event
Expand All @@ -32,12 +35,17 @@ export const fromEvent = (
* fromEvent(document.body, "mousemove"); // Stream<Event>
* ```
*
* @see fromEvent
*
* @param src
* @param name
* @param opts
* @param listenerOpts
* @param streamOpts
*/
export const fromDOMEvent = <K extends keyof GlobalEventHandlersEventMap>(
src: EventTarget,
name: K,
opts: boolean | AddEventListenerOptions = false
): Stream<GlobalEventHandlersEventMap[K]> => <any>fromEvent(src, name, opts);
listenerOpts: boolean | AddEventListenerOptions = false,
streamOpts?: Partial<CommonOpts>
): Stream<GlobalEventHandlersEventMap[K]> =>
<any>fromEvent(src, name, listenerOpts, streamOpts);
14 changes: 10 additions & 4 deletions packages/rstream/src/from/interval.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { CloseMode, CommonOpts } from "../api";
import { Stream } from "../stream";
import { nextID } from "../utils/idgen";
import { optsWithID } from "../utils/idgen";

/**
* Returns a new `Stream` which emits a monotonically increasing counter
Expand All @@ -9,17 +10,22 @@ import { nextID } from "../utils/idgen";
*
* @param delay
* @param count
* @param opts
*/
export const fromInterval = (delay: number, count = Infinity) =>
export const fromInterval = (
delay: number,
count = Infinity,
opts?: Partial<CommonOpts>
) =>
new Stream<number>((stream) => {
let i = 0;
stream.next(i++);
let id = setInterval(() => {
stream.next(i++);
if (--count <= 0) {
clearInterval(id);
stream.done();
stream.closeIn !== CloseMode.NEVER && stream.done();
}
}, delay);
return () => clearInterval(id);
}, `interval-${nextID()}`);
}, optsWithID("interval-", opts));
26 changes: 17 additions & 9 deletions packages/rstream/src/from/iterable.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { CloseMode, CommonOpts } from "../api";
import { Stream } from "../stream";
import { nextID } from "../utils/idgen";
import { optsWithID } from "../utils/idgen";

/**
* Creates a new `Stream` of given iterable which asynchronously calls
Expand All @@ -11,22 +12,26 @@ import { nextID } from "../utils/idgen";
*
* @param src
* @param delay
* @param close
* @param opts
*/
export const fromIterable = <T>(src: Iterable<T>, delay = 0, close = true) =>
export const fromIterable = <T>(
src: Iterable<T>,
delay = 0,
opts?: Partial<CommonOpts>
) =>
new Stream<T>((stream) => {
const iter = src[Symbol.iterator]();
const id = setInterval(() => {
let val: IteratorResult<T>;
if ((val = iter.next()).done) {
clearInterval(id);
close && stream.done();
stream.closeIn !== CloseMode.NEVER && stream.done();
} else {
stream.next(val.value);
}
}, delay);
return () => clearInterval(id);
}, `iterable-${nextID()}`);
}, optsWithID("iterable-", opts));

/**
* Creates a new `Stream` of given iterable which synchronously calls
Expand All @@ -36,12 +41,15 @@ export const fromIterable = <T>(src: Iterable<T>, delay = 0, close = true) =>
* be avoided by passing `false` as last argument.
*
* @param src
* @param close
* @param opts
*/
export const fromIterableSync = <T>(src: Iterable<T>, close = true) =>
export const fromIterableSync = <T>(
src: Iterable<T>,
opts?: Partial<CommonOpts>
) =>
new Stream<T>((stream) => {
for (let s of src) {
stream.next(s);
}
close && stream.done();
}, `iterable-${nextID()}`);
stream.closeIn !== CloseMode.NEVER && stream.done();
}, optsWithID("iterable-sync-", opts));
11 changes: 6 additions & 5 deletions packages/rstream/src/from/promise.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
import { State } from "../api";
import { CloseMode, CommonOpts, State } from "../api";
import { Stream } from "../stream";
import { nextID } from "../utils/idgen";
import { optsWithID } from "../utils/idgen";

/**
* Yields a single-value stream of the resolved promise and then
* automatically marks itself done. It doesn't matter if the promise
* resolves before the first subscriber has attached.
*
* @param src
* @param opts
*/
export const fromPromise = <T>(src: Promise<T>) => {
export const fromPromise = <T>(src: Promise<T>, opts?: Partial<CommonOpts>) => {
let canceled = false;
let isError = false;
let err: any = {};
Expand All @@ -26,7 +27,7 @@ export const fromPromise = <T>(src: Promise<T>) => {
err = null;
} else {
stream.next(x);
stream.done();
stream.closeIn !== CloseMode.NEVER && stream.done();
}
}
},
Expand All @@ -35,5 +36,5 @@ export const fromPromise = <T>(src: Promise<T>) => {
return () => {
canceled = true;
};
}, `promise-${nextID()}`);
}, optsWithID("promise-", opts));
};
9 changes: 7 additions & 2 deletions packages/rstream/src/from/promises.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { mapcat } from "@thi.ng/transducers";
import { CommonOpts } from "../api";
import { Subscription } from "../subscription";
import { optsWithID } from "../utils/idgen";
import { fromPromise } from "./promise";

/**
Expand Down Expand Up @@ -33,6 +35,9 @@ import { fromPromise } from "./promise";
* @param promises
*/
export const fromPromises = <T>(
promises: Iterable<Promise<T>>
promises: Iterable<Promise<T>>,
opts?: Partial<CommonOpts>
): Subscription<T[], T> =>
fromPromise(Promise.all(promises)).transform(mapcat((x: T[]) => x));
fromPromise(Promise.all(promises), optsWithID("promises-", opts)).transform(
mapcat((x: T[]) => x)
);
14 changes: 9 additions & 5 deletions packages/rstream/src/from/raf.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { isNode } from "@thi.ng/checks";
import { CommonOpts } from "../api";
import { Stream } from "../stream";
import { nextID } from "../utils/idgen";
import { optsWithID } from "../utils/idgen";
import { fromInterval } from "./interval";

/**
Expand All @@ -12,9 +13,9 @@ import { fromInterval } from "./interval";
* Subscribers to this stream will be processed during that same loop
* iteration.
*/
export const fromRAF = () =>
export const fromRAF = (opts?: Partial<CommonOpts>) =>
isNode()
? fromInterval(16)
? fromInterval(16, undefined, opts)
: new Stream<number>((stream) => {
let i = 0;
let isActive = true;
Expand All @@ -23,5 +24,8 @@ export const fromRAF = () =>
isActive && (id = requestAnimationFrame(loop));
};
let id = requestAnimationFrame(loop);
return () => ((isActive = false), cancelAnimationFrame(id));
}, `raf-${nextID()}`);
return () => {
isActive = false;
cancelAnimationFrame(id);
};
}, optsWithID("raf-", opts));
14 changes: 9 additions & 5 deletions packages/rstream/src/from/view.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ import { Predicate2 } from "@thi.ng/api";
import { ReadonlyAtom, ViewTransform } from "@thi.ng/atom";
import { View } from "@thi.ng/atom";
import { Path } from "@thi.ng/paths";
import { CommonOpts } from "../api";
import { Stream } from "../stream";
import { nextID } from "../utils/idgen";
import { optsWithID } from "../utils/idgen";

/**
* Similar to `fromAtom()`, but creates an eager derived view for a
Expand Down Expand Up @@ -41,14 +42,14 @@ import { nextID } from "../utils/idgen";
* @param path
* @param tx
* @param equiv
* @param id
* @param opts
*/
export const fromView = <T>(
atom: ReadonlyAtom<any>,
path: Path,
tx?: ViewTransform<T>,
equiv?: Predicate2<any>,
id?: string
opts?: Partial<CommonOpts>
): Stream<T> =>
new Stream<T>((stream) => {
let isActive = true;
Expand All @@ -61,5 +62,8 @@ export const fromView = <T>(
false,
equiv
);
return () => ((isActive = false), view.release());
}, id || `view-${nextID()}`);
return () => {
isActive = false;
view.release();
};
}, optsWithID("view-", opts));
10 changes: 5 additions & 5 deletions packages/rstream/src/from/worker.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { LOGGER } from "../api";
import { CommonOpts, LOGGER } from "../api";
import { Stream } from "../stream";
import { nextID } from "../utils/idgen";
import { optsWithID } from "../utils/idgen";
import { makeWorker } from "../utils/worker";

/**
Expand All @@ -21,12 +21,12 @@ import { makeWorker } from "../utils/worker";
*
* @param worker
* @param terminate
* @param id
* @param opts
*/
export const fromWorker = <T>(
worker: Worker | Blob | string,
terminate = true,
id?: string
opts?: Partial<CommonOpts>
) => {
const _worker = makeWorker(worker);
return new Stream<T>((stream) => {
Expand All @@ -46,5 +46,5 @@ export const fromWorker = <T>(
_worker.terminate();
}
};
}, id || `worker-${nextID()}`);
}, optsWithID("worker-", opts));
};
Loading

0 comments on commit e770469

Please sign in to comment.