Skip to content

Latest commit

 

History

History

rstream

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

@thi.ng/rstream

npm version npm downloads

This project is part of the @thi.ng/umbrella monorepo.

About

Lightweight reactive multi-tap streams and transducer based transformation pipeline constructs, written in TypeScript.

This library provides & uses three key building blocks for reactive programming:

  • Stream sources: event targets, iterables, timers, promises, watches, workers, CSP channels, custom...
  • Subscriptions: chained stream processors, each subscribable (one-to-many) itself
  • Transducers: stream transformers, either as individual subscription or to transform values for a single subscription. See @thi.ng/transducers for 100+ composable operators.
  • Recursive teardown: Whenever possible, any unsubscription initiates cleanup and propagates to parent(s).

Using these building blocks, a growing number of high-level operations are provided too:

Stream creation

Creates a new Stream instance, optionally with given StreamSource function and / or ID. If a src function is provided, the function will be only called (with the Stream instance as single argument) once the first subscriber has attached to the stream. If the function returns another function, it will be used for cleanup purposes if the stream is cancelled, e.g. if the last subscriber has unsubscribed. Streams are intended as (primarily async) data sources in a dataflow graph and are the primary construct for the various from*() functions provided by the package. However, streams can also be triggered manually (from outside the stream), in which case the user should call stream.next() to cause value propagation.

a = rs.stream((s) => {
    s.next(1);
    s.next(2);
    s.done()
});
a.subscribe(trace("a"))
// a 1
// a 2
// a done

// as reactive value mechanism
b = rs.stream();
// or alternatively
// b = rs.subscription();

b.subscribe(trace("b1"));
b.subscribe(trace("b2"));

// external trigger
b.next(42);
// b1 42
// b2 42

Streams (like Subscriptions) implement the @thi.ng/api IDeref interface which provides read access to a stream's last received value. This is useful for various purposes, e.g. in combination with @thi.ng/hdom, which supports direct embedding of streams (i.e. their values) into UI components (and will be deref'd automatically). If the stream has not yet emitted a value or if the stream is done, it will deref to undefined.

Creates a new Subscription instance, the fundamental datatype & building block provided by this package (Streams are Subscriptions too). Subscriptions can be:

  • linked into directed graphs (if async, not necessarily DAGs)
  • transformed using transducers (incl. early termination)
  • can have any number of subscribers (optionally each w/ their own transducer)
  • recursively unsubscribe themselves from parent after their last subscriber unsubscribed
  • will go into a non-recoverable error state if NONE of the subscribers has an error handler itself
  • implement the @thi.ng/api IDeref interface
// as reactive value mechanism (same as with stream() above)
s = rs.subscription();
s.subscribe(rs.trace("s1"));
s.subscribe(rs.trace("s2"), tx.filter((x) => x > 25));

// external trigger
s.next(23);
// s1 23
s.next(42);
// s2 42
// s1 42

Other stream creation helpers

Stream merging

merge() - unsorted merge from multiple inputs (dynamic add/remove)

diagram

Returns a new StreamMerge instance, a subscription type consuming inputs from multiple inputs and passing received values on to any subscribers. Input streams can be added and removed dynamically. By default, StreamMerge calls done() when the last active input is done, but this behavior can be overridden via the close option (set it to false).

merge({
    // input streams w/ different frequencies
    src: [
        fromIterable([1, 2, 3], 10),
        fromIterable([10, 20, 30], 21),
        fromIterable([100, 200, 300], 7)
    ]
}).subscribe(trace());
// 100
// 1
// 200
// 10
// 2
// 300
// 3
// 20
// 30

Use the labeled() transducer for each input to create a stream of labeled values and track their provenance:

merge({
    src: [
        fromIterable([1, 2, 3]).transform(labeled("a")),
        fromIterable([10, 20, 30]).transform(labeled("b")),
    ]
}).subscribe(trace());
// ["a", 1]
// ["b", 10]
// ["a", 2]
// ["b", 20]
// ["a", 3]
// ["b", 30]

See StreamMergeOpts for further reference of the various behavior options.

sync() - synchronized merge and labeled tuple objects

diagram

Similar to StreamMerge above, but with extra synchronization of inputs. Before emitting any new values, StreamSync collects values until at least one has been received from all inputs. Once that's the case, the collected values are sent as labeled tuple object to downstream subscribers. Each value in the emitted tuple objects is stored under their input stream's ID. Only the last value received from each input is passed on. After the initial tuple has been emitted, you can choose from two possible behaviors:

  1. Any future change in any input will produce a new result tuple. These tuples will retain the most recently read values from other inputs. This behavior is the default and illustrated in the above schematic.
  2. If the reset option is true, every input will have to provide at least one new value again until another result tuple is produced.

Any done inputs are automatically removed. By default, StreamSync calls done() when the last active input is done, but this behavior can be overridden via the close constructor option (set to false).

const a = rs.stream();
const b = rs.stream();
s = sync({ src: { a, b } }).subscribe(trace("result: "));
a.next(1);
b.next(2);
// result: { a: 1, b: 2 }

Input streams can be added and removed dynamically and the emitted tuple size adjusts to the current number of inputs (the next time a value is received from any input).

If the reset option is enabled, the last emitted tuple is allowed to be incomplete, by default. To only allow complete tuples, also set the all option to false.

The synchronization is done via the partitionSync() transducer from the @thi.ng/transducers package. See this function's docs for further details.

See StreamSyncOpts for further reference of the various behavior options.

Stream splitting

pubsub() - topic based splitting

diagram

Topic based stream splitter. Applies topic function to each received value and only forwards it to child subscriptions for returned topic. The actual topic (return value from topic fn) can be of any type, apart from undefined. Complex topics (e.g objects / arrays) are allowed and they're matched with registered topics using @thi.ng/equiv by default (but customizable via equiv option). Each topic can have any number of subscribers.

If a transducer is specified for the PubSub, it is always applied prior to passing the input to the topic function. I.e. in this case the topic function will receive the transformed inputs.

PubSub supports dynamic topic subscriptions and unsubscriptions via subscribeTopic() and unsubscribeTopic(). However, the standard subscribe() / unsubscribe() methods are NOT supported (since meaningless here) and will throw an error! unsubscribe() can only be called WITHOUT argument to unsubscribe the entire PubSub instance (incl. all topic subscriptions) from the parent stream.

bisect() - splitting via predicate

Returns a new PubSub instance using given predicate pred as boolean topic function and a & b as subscribers for truthy (a) and falsy b values.

rs.fromIterable([1, 2, 3, 4]).subscribe(
  rs.bisect(
    (x) => !!(x & 1),
    rs.trace("odd"),
    rs.trace("even")
  )
);
// odd 1
// even 2
// odd 3
// even 4
// odd done
// even done

If a or b need to be subscribed to directly, then a / b MUST be first created as Subscription (if not already) and a reference kept prior to calling bisect().

const odd = rs.subscription();
const even = rs.subscription();
odd.subscribe(rs.trace("odd"));
odd.subscribe(rs.trace("odd x10"), tx.map((x)=> x * 10));
even.subscribe(rs.trace("even"));

rs.fromIterable([1, 2, 3, 4]).subscribe(rs.bisect((x) => !!(x & 1), odd, even));
// odd x10 10
// odd 1
// even 2
// odd x10 30
// odd 3
// even 4
// odd done
// odd x10 done
// even done

Side-chaining

sidechainPartition() - chunks input, controlled by sidechain

diagram

Buffers values from src until side chain fires, then emits buffer (unless empty) and repeats process until either input is done. By default, the value read from the side chain is ignored, however the optional predicate can be used to only trigger for specific values / conditions.

// merge various event streams
merge([
    fromEvent(document,"mousemove"),
    fromEvent(document,"mousedown"),
    fromEvent(document,"mouseup")
])
// queue event processing to only execute during the
// requestAnimationFrame cycle (RAF)
.subscribe(sidechainPartition(fromRAF()))
.subscribe(trace())

sidechainToggle() - toggles input, controlled by sidechain

diagram

Filters values from input based on values received from side chain. By default, the value read from the side chain is ignored, however the optional predicate can be used to only trigger for specific values/conditions. Every time the predicate fn returns true, the filter will be toggled on/off. Whilst switched off, no input values will be forwarded.

// use slower interval stream to toggle main stream on/off
fromInterval(500)
  .subscribe(sidechainToggle(fromInterval(1000)))
  .subscribe(trace());
// 0
// 3
// 4
// 7
// 8
...

Other subscription ops

  • postWorker - send values to workers (incl. optional worker instantiation)
  • resolve - resolve on-stream promises
  • trace - debug helper
  • transduce - transduce or just reduce an entire stream into a promise

Support packages

Conceptual differences to RxJS

(No value judgements implied - there's room for both approaches!)

  • Streams are not the same as Observables: I.e. stream sources are NOT (often just cannot) re-run for each new sub added. Only the first sub is guaranteed to receive all values. Subs added at a later time MIGHT not receive earlier emitted values, but only the most recent emitted and any future values)
  • Every subscription supports any number of subscribers, which can be added/removed at any time
  • Every unsubscription recursively triggers upstream unsubscriptions (provided a parent has no other active child subscriptions)
  • Every subscription can have its own transducer transforming incoming values (possibly into multiple new ones)
  • Transducers can create streams themselves (only for merge() / sync())
  • Transducers can cause early stream termination and subsequent unwinding
  • Values can be manually injected into the stream pipeline / graph at any point
  • Every Stream also is a subscription
  • Unhandled errors in subscriptions will move subscription into error state and cause unsubscription from parent (if any). Unhandled errors in stream sources will cancel the stream.
  • Much smaller API surface since most common & custom operations can be solved via available transducers. Therefore less need to provide specialized functions (map / filter etc.) and more flexibility in terms of composing new operations.
  • IMHO less confusing naming / terminology (only streams (producers) & subscriptions (consumers))

Installation

yarn add @thi.ng/rstream

Dependencies

Usage examples

Several demos in this repo's /examples directoy are using this package.

A small selection:

Realtime crypto candle chart

screenshot

Source | Live version

Interactive SVG grid generator

screenshot

Source | Live version

Mouse gesture analysis

screenshot

Source | Live version

Declarative dataflow graph

This demo is utilizing the @thi.ng/rstream-graph support package.

Source | Live version

@thi.ng/hdom benchmark

The FPS counter canvas component used in this benchmark is driven by this package.

Source | Live version

Basic usage

This example uses this package for reactive state values and @thi.ng/transducers-hdom to achieve push-based DOM updates.

Live demo | standalone example

import { fromInterval, stream, sync } from "@thi.ng/rstream/stream";
import { updateDOM } from "@thi.ng/rstream/transducers-hdom";
import * as tx from "@thi.ng/rstream/transducers";

// root component function
const app = ({ ticks, clicks }) =>
    ["div",
        `${ticks} ticks & `,
        ["a",
            { href: "#", onclick: () => clickStream.next(0)},
            `${clicks} clicks`]
    ];

// transformed stream to count clicks
const clickStream = stream().transform(tx.scan(tx.count(-1)));
// seed
clickStream.next(0);

// stream combinator
// waits until all inputs have produced at least one value,
// then updates whenever either input has changed
sync({
    // streams to combine & synchronize
    src: {
        ticks: fromInterval(1000),
        clicks: clickStream,
    },
}).transform(
    // transform tuple into hdom component
    tx.map(app),
    // apply hdom tree to real DOM
    updateDOM({ root: document.body })
);

Authors

  • Karsten Schmidt

License

© 2017 - 2018 Karsten Schmidt // Apache Software License 2.0