Skip to content

Commit

Permalink
refactor(rstream-graph): update prepareNodeInputs/Outputs()
Browse files Browse the repository at this point in the history
  • Loading branch information
postspectacular committed Sep 19, 2020
1 parent 91b4329 commit 088bf37
Showing 1 changed file with 48 additions and 39 deletions.
87 changes: 48 additions & 39 deletions packages/rstream-graph/src/graph.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { IObjectOf, Tuple } from "@thi.ng/api";
import type { IObjectOf, Path, Tuple } from "@thi.ng/api";
import type { IAtom } from "@thi.ng/atom";
import { isFunction, isPlainObject, isString } from "@thi.ng/checks";
import { illegalArgs } from "@thi.ng/errors";
Expand Down Expand Up @@ -114,24 +114,20 @@ const prepareNodeInputs = (
const res: NodeInputs = {};
if (!ins) return res;
for (let id in ins) {
let s;
const i = ins[id];
if (i.path) {
s = fromViewUnsafe(state, { path: i.path });
} else if (i.stream) {
s = isString(i.stream) ? resolve(i.stream) : i.stream(resolve);
} else if (i.const != null) {
s = fromIterableSync(
[isFunction(i.const) ? i.const(resolve) : i.const],
{ closeIn: CloseMode.NEVER }
);
} else {
illegalArgs(`invalid node input: ${id}`);
}
if (i.xform) {
s = s.subscribe(i.xform, id);
}
res[id] = s;
const src = i.path
? fromViewUnsafe(state, { path: i.path })
: i.stream
? isString(i.stream)
? resolve(i.stream)
: i.stream(resolve)
: i.const !== undefined
? fromIterableSync(
[isFunction(i.const) ? i.const(resolve) : i.const],
{ closeIn: CloseMode.NEVER }
)
: illegalArgs(`invalid node input: ${id}`);
res[id] = i.xform ? src.subscribe(i.xform, id) : src;
}
return res;
};
Expand All @@ -145,31 +141,44 @@ const prepareNodeOutputs = (
const res: NodeOutputs = {};
if (!outs) return res;
for (let id in outs) {
const o = outs[id];
if (isFunction(o)) {
res[id] = o(node, id);
} else if (id == "*") {
res[id] = ((path) =>
node.subscribe(
{
next: (x) => state.resetIn(<any>path, x),
},
{ id: `out-${nodeID}` }
))(o);
} else {
res[id] = ((path, id) =>
node.subscribe(
{
next: (x) => state.resetIn(<any>path, x),
},
map((x) => (x != null ? x[id] : x)),
{ id: `out-${nodeID}-${id}` }
))(o, id);
}
const out = outs[id];
res[id] = isFunction(out)
? out(node, id)
: id == "*"
? nodeOutAll(node, state, nodeID, out)
: nodeOutID(node, state, nodeID, out, id);
}
return res;
};

const nodeOutAll = (
node: ISubscribable<any>,
state: IAtom<any>,
nodeID: string,
path: Path
) =>
node.subscribe(
{
next: (x) => state.resetIn(<any>path, x),
},
{ id: `out-${nodeID}` }
);

const nodeOutID = (
node: ISubscribable<any>,
state: IAtom<any>,
nodeID: string,
path: Path,
id: string
) =>
node.subscribe(
{
next: (x) => state.resetIn(<any>path, x),
},
map((x) => (x != null ? x[id] : x)),
{ id: `out-${nodeID}-${id}` }
);

/**
* Compiles given {@link NodeSpec} and adds it to graph. Returns compiled
* {@link Node} object for the given spec. Throws error if the graph already
Expand Down

0 comments on commit 088bf37

Please sign in to comment.