Skip to content

Commit

Permalink
refactor(rstream): update forkJoin() & tween() impls (StreamSync)
Browse files Browse the repository at this point in the history
  • Loading branch information
postspectacular committed Jul 26, 2020
1 parent c9d983d commit 08ca3e1
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 23 deletions.
54 changes: 33 additions & 21 deletions packages/rstream/src/forkjoin.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
import { comp, map, mapcat, range } from "@thi.ng/transducers";
import type { ArrayLikeIterable, Fn, Fn3, IObjectOf } from "@thi.ng/api";
import {
assocObj,
comp,
map,
mapcat,
range,
transduce,
} from "@thi.ng/transducers";
import type { CommonOpts, ISubscribable, ITransformable } from "./api";
import { sync } from "./stream-sync";
import { tunnel } from "./subs/tunnel";
import { Subscription } from "./subscription";
import type { ArrayLikeIterable, Fn, Fn3 } from "@thi.ng/api";
import type { CommonOpts, ITransformable } from "./api";

export interface ForkJoinOpts<IN, MSG, RES, OUT> extends Partial<CommonOpts> {
/**
Expand Down Expand Up @@ -112,24 +119,29 @@ export const forkJoin = <IN, MSG, RES, OUT>(
): Subscription<any, OUT> => {
const numWorkers = opts.numWorkers || navigator.hardwareConcurrency || 4;
const workerIDs = range(numWorkers);
return sync<RES, OUT>({
src: [
...map(
(id) =>
opts.src
.transform(map((x) => opts.fork(id, numWorkers, x)))
.subscribe(
tunnel<MSG, RES>({
src: opts.worker,
transferables: opts.transferables,
interrupt: opts.interrupt === true,
terminate: opts.terminate,
id: String(id),
})
),
workerIDs
),
],
return sync({
src: transduce<
number,
[string, ISubscribable<RES>],
IObjectOf<ISubscribable<RES>>
>(
map((id) => [
String(id),
opts.src
.transform(map((x) => opts.fork(id, numWorkers, x)))
.subscribe(
tunnel<MSG, RES>({
src: opts.worker,
transferables: opts.transferables,
interrupt: opts.interrupt === true,
terminate: opts.terminate,
id: String(id),
})
),
]),
assocObj(),
workerIDs
),
xform: comp(
// form result tuple in original order
map((results) => [...map((id) => results[id], workerIDs)]),
Expand Down
4 changes: 2 additions & 2 deletions packages/rstream/src/tween.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ export const tween = <T>(
stop?: Fn2<T, T, boolean>,
clock?: ISubscribable<any> | number
) =>
sync<any, { src: T }>({
sync({
src: {
src,
_:
Expand Down Expand Up @@ -105,7 +105,7 @@ export const tweenNumber = (
eps = 1e-3,
clock?: ISubscribable<any> | number
) =>
tween<number>(
tween(
src,
init,
(a, b) => a + (b - a) * speed,
Expand Down

0 comments on commit 08ca3e1

Please sign in to comment.