diff --git a/packages/rstream-graph/src/api.ts b/packages/rstream-graph/src/api.ts index c0310fc11b..2699a5e89a 100644 --- a/packages/rstream-graph/src/api.ts +++ b/packages/rstream-graph/src/api.ts @@ -8,17 +8,20 @@ import type { Transducer } from "@thi.ng/transducers"; * {@link @thi.ng/rstream#ISubscribable} using given object of inputs * and node ID. See `node()` and `node1()`. */ -export type NodeFactory = (src: NodeInputs, id: string) => ISubscription; +export type NodeFactory = ( + src: NodeInputs, + id: string +) => ISubscription; export type NodeResolver = Fn; -export type NodeInputs = IObjectOf; -export type NodeOutputs = IObjectOf; +export type NodeInputs = IObjectOf>; +export type NodeOutputs = IObjectOf>; export type Graph = IObjectOf; export interface Node { ins: NodeInputs; outs: NodeOutputs; - node: ISubscription; + node: ISubscription; } /** @@ -103,7 +106,7 @@ export interface NodeSpec { export interface NodeInputSpec { id?: string; path?: Path; - stream?: string | Fn; + stream?: string | Fn>; const?: any | Fn; xform?: Transducer; } @@ -111,6 +114,6 @@ export interface NodeInputSpec { export type NodeOutputSpec = Path | NodeOutputFn; export type NodeOutputFn = ( - node: ISubscription, + node: ISubscription, id: NumOrString -) => ISubscription; +) => ISubscription; diff --git a/packages/rstream-graph/src/graph.ts b/packages/rstream-graph/src/graph.ts index a93c8fe62d..7d713bf1f9 100644 --- a/packages/rstream-graph/src/graph.ts +++ b/packages/rstream-graph/src/graph.ts @@ -115,7 +115,7 @@ const prepareNodeInputs = ( if (!ins) return res; for (let id in ins) { const i = ins[id]; - const src: ISubscription = i.path + const src: ISubscription = i.path ? fromViewUnsafe(state, { path: i.path }) : i.stream ? isString(i.stream) @@ -134,7 +134,7 @@ const prepareNodeInputs = ( const prepareNodeOutputs = ( outs: IObjectOf | undefined, - node: ISubscription, + node: ISubscription, state: IAtom, nodeID: string ) => { @@ -152,7 +152,7 @@ const prepareNodeOutputs = ( }; const nodeOutAll = ( - node: ISubscription, + node: ISubscription, state: IAtom, nodeID: string, path: Path @@ -165,7 +165,7 @@ const nodeOutAll = ( ); const nodeOutID = ( - node: ISubscription, + node: ISubscription, state: IAtom, nodeID: string, path: Path, @@ -260,7 +260,7 @@ export const node = ( inputIDs?: string[], reset = false ): NodeFactory => ( - src: IObjectOf, + src: IObjectOf>, id: string ): StreamSync => ( ensureInputs(src, inputIDs, id), sync({ src, xform, id, reset }) @@ -279,9 +279,9 @@ export const node1 = ( xform?: Transducer, inputID = "src" ): NodeFactory => ( - src: IObjectOf, + src: IObjectOf>, id: string -): ISubscription => { +): ISubscription => { ensureInputs(src, [inputID], id); return src[inputID].subscribe({}, { xform, id }); }; @@ -309,7 +309,7 @@ export const node2 = ( * @param nodeID - */ export const ensureInputs = ( - src: IObjectOf, + src: IObjectOf>, inputIDs: string[] | undefined, nodeID: string ) => {