Lightweight reactive multi-tap streams and transducer based transformation pipeline constructs, written in TypeScript.
This library provides & uses three key building blocks for reactive programming:
- Stream sources: event targets, iterables, timers, promises, watches, workers, CSP channels, custom...
- Subscriptions: chained stream processors, each subscribable (one-to-many) itself
- Transducers: stream transformers, either as individual subscription or to transform values for a single subscription. See @thi.ng/transducers for 90+ composable operators.
- Recursive teardown: Whenever possible, any unsubscription initiates cleanup and propagates to parent(s).
Using these building blocks, a growing number of high-level operations are provided too:
- fromAtom() - streams from value changes in atoms/cursors
- fromChannel() - CSP channel to stream conversion
- fromEvent() - DOM events
- fromInterval() - interval based counters
- fromIterable() - arrays, iterators / generators
- fromPromise() - single value stream from promis
- fromPromises() - results from multiple promise
- fromRAF() - requestAnimationFrame() counter (w/ node fallback)
- fromView() - derived view changes (see @thi.ng/atom)
- fromWorker() - messages received from worker
- manual / custom - anything else
- merge - unsorted merge from multiple inputs (dynamic add/remove)
- sync - synchronized merge and labeled tuple objects
- bisect - split via predicate
- postWorker - send values to workers (incl. optional worker instantiation)
- resolve - resolve on-stream promises
- sidechainPartition - emits chunks from source, controlled by sidechain stream
- sidechainToggle - toggles source based on signals from sidechain
- trace - debug helper
- transduce - transduce or just reduce an entire stream into a promise
- Subscriptions implement @thi.ng/api's
IDeref
interface and therefore can be used directly in UI components based on @thi.ng/hdom.
Furthermore, the @thi.ng/rstream-log package provides an extensible multi-level, multi-target logging solution based on this library.
(No value judgements implied - there's room for both approaches!)
- Streams are not the same as Observables: I.e. stream sources are NOT (often just cannot) re-run for each new sub added. Only the first sub is guaranteed to receive all values. Subs added at a later time MIGHT not receive earlier emitted values, but only the most recent emitted and any future values)
- Every subscription supports any number of subscribers, which can be added/removed at any time
- Every unsubscription recursively triggers upstream unsubscriptions (provided a parent has no other active child subscriptions)
- Every subscription can have its own transducer transforming incoming values (possibly into multiple new ones)
- Transducers can create streams themselves (only for
merge()
/sync()
) - Transducers can cause early stream termination and subsequent unwinding
- Values can be manually injected into the stream pipeline / graph at any point
- Every Stream also is a subscription
- Unhandled errors in subscriptions will move subscription into error state and cause unsubscription from parent (if any). Unhandled errors in stream sources will cancel the stream.
- Much smaller API surface since most common & custom operations can be solved via available transducers. Therefore less need to provide specialized functions (map / filter etc.) and more flexibility in terms of composing new operations.
- IMHO less confusing naming / terminology (only streams (producers) & subscriptions (consumers))
yarn add @thi.ng/rstream
import * as rs from "@thi.ng/rstream";
import * as tx from "@thi.ng/transducers";
// requestAnimationFrame() event stream (counter)
// (in Node falls back to `fromInterval(16)`)
const raf = rs.fromRAF();
// add subscription w/ a composed transducer computing
// average FPS of last 10 frames
raf.subscribe(
{
next(x) {
console.log(x.toFixed(1), "fps");
}
},
tx.comp(
tx.benchmark(),
tx.movingAverage(10),
tx.map(x => 1000 / x)
)
);
// add another subscription (untransformed)
raf.subscribe(rs.trace());
// stop stream after 10 secs
setTimeout(()=> raf.done(), 10000);
new rs.StreamMerge({
src: [
rs.fromEvent(document, "mousemove"),
rs.fromEvent(document, "mousedown"),
rs.fromEvent(document, "mouseup"),
]
})
// add event transformer
.subscribe(tx.map((e) => [e.type, [e.clientX, e.clientY]]))
// add debug subscription
.subscribe(rs.trace());
// ["mousedown", [472, 195]]
// ["mousemove", [472, 197]]
// ["mouseup", [473, 198]]
// ["mousemove", [485, 204]]
// ...
This example uses synchronized stream merging to implement a dataflow graph whose leaf inputs (and their changes) are sourced from a central immutable atom.
import { Atom } from "@thi.ng/atom/atom";
import { map } from "@thi.ng/transducers";
import * as rs from "@thi.ng/rstream";
// create mutable/watchable container for graph inputs
const graph = new Atom<any>({
a1: { ports: { a: 1, b: 2 } },
a2: { ports: { b: 10 } },
a3: { ports: { c: 0 } },
});
// create a synchronized stream merge from given inputs
const adder = (src) =>
rs.sync({
src,
// define transducer for merged tuple objects
// summing all values in each tuple
// (i.e. the values from the input streams)
xform: map((ports) => {
let sum = 0;
for (let p in ports) {
sum += ports[p];
}
return sum;
}),
// reset=false will only synchronize *all* inputs for the
// very 1st merged tuple, then emit updated ones when *any*
// input has changed with other input values in the tuple
// remaining the same
reset: false
});
// define first dataflow node
// `fromView()` creates a stream of value changes
// for given value path in the above atom
const a1 = adder([
rs.fromView(graph, "a1.ports.a"),
rs.fromView(graph, "a1.ports.b"),
]);
// this node computes sum of:
// - prev node
// - view of a2.ports.b value in atom
// - for fun, another external stream (iterable)
const a2 = adder([
a1,
rs.fromView(graph, "a2.ports.b"),
rs.fromIterable([0, 1, 2]),
]);
// last node computes sum of the other 2 nodes
const a3 = adder([a1, a2]);
// add a console.log sub to see results
a3.subscribe(rs.trace("result:"));
// result: 16
// result: 17
// result: 18
// value update in atom triggers recomputation
// of impacted graph nodes (and only those!)
setTimeout(() => graph.resetIn("a2.ports.b", 100), 100);
// result: 108
import * as atom from "@thi.ng/atom";
import * as tx from "@thi.ng/transducers";
// central app state / single source of truth
const app = new atom.Atom({ ui: { theme: "dark", mode: false}, foo: "bar" });
// define some cursors for different UI params
const theme = new atom.Cursor(app, "ui.theme");
const mode = new atom.Cursor(app, "ui.mode");
// create streams of cursor value changes
rs.fromAtom(theme).subscribe(rs.trace("theme:"));
// with transducer
rs.fromAtom(mode).subscribe(rs.trace("mode:"), tx.map(mode => mode ? "advanced" : "basic"));
// another one for an hitherto unknown value in app state (via derived view)
rs.fromView(app, "session.user").subscribe(rs.trace("user:"));
// attach history only to `ui` branch
// undo/redo will not record/change other keys in the atom
const hist = new atom.History(new atom.Cursor(app, "ui"));
hist.record(); // record current snapshot
theme.reset("light");
// theme: light
hist.record();
mode.swap(mode => !mode); // toggle mode
// mode: advanced
hist.undo(); // 1st
// mode: basic
// { theme: 'light', mode: false }
hist.undo(); // 2nd
// theme: dark
// { theme: 'dark', mode: false }
hist.redo(); // 1st
// theme: light
// { theme: 'light', mode: false }
// update another part of the app state (DON'T MUTATE!)
app.swap((state) => atom.setIn(state, "session.user", "asterix"));
// user: asterix
// { ui: { theme: 'light', mode: false },
// foo: 'bar',
// session: { user: 'asterix' } }
hist.redo(); // redo 2nd time
// mode: advanced
// { theme: 'light', mode: true }
// verify history redo did not destroy other keys
app.deref();
// { ui: { theme: 'light', mode: true },
// foo: 'bar',
// session: { user: 'asterix' } }
TODO more to come... see tests for now!
- Karsten Schmidt
© 2017 - 2018 Karsten Schmidt // Apache Software License 2.0