Skip to content

Commit

Permalink
Merge branch 'feature/rstream-refactor' into develop
Browse files Browse the repository at this point in the history
* feature/rstream-refactor: (23 commits)
  docs(rstream): update readme, add dataflow example
  docs(rstream): update readme
  feat(rstream): add IDeref impl for Subscription
  feat(rstream): add merge()/sync() ctor wrappers
  feat(rstream): add transduce(), update re-exports
  test(rstream): add StreamSync & Subscription tests
  refactor(rstream): simplify unsubscribe() logic
  docs(rstream): add Cache deprecation docstring
  feat(rstream): Subscription stores last value and passes to new subs
  refactor(rstream): update & StreamMerge/SyncOpts, minor fixes StreamSync
  feat(rstream): fix #6 update StreamMerge to support transduced input streams
  test(rstream): add/update sidechain* tests
  feat(rstream): update Sidechain*.next(), add unsubscribe()
  minor(rstream): fromPromise()
  feat(rstream): add fromView(), update fromAtom() docs, update re-exports
  feat(rstream): update Subscription.unsubscribe()
  fix(rstream): bisect() add downstream impl checks, add tests
  test(rstream): remove obsolete fromPromise error test case
  refactor(rstream): simplify Subscription, update all impls
  refactor(rstream): simplify StreamMerge source handling
  ...
  • Loading branch information
postspectacular committed Mar 21, 2018
2 parents 5bf5ce6 + deb59a1 commit e58c5e0
Show file tree
Hide file tree
Showing 21 changed files with 817 additions and 110 deletions.
174 changes: 156 additions & 18 deletions packages/rstream/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,98 @@

[![npm (scoped)](https://img.shields.io/npm/v/@thi.ng/rstream.svg)](https://www.npmjs.com/package/@thi.ng/rstream)

Lightweight reactive multi-tap streams and transducer based transformation
pipeline constructs, written in TypeScript.
Lightweight reactive multi-tap streams and transducer based
transformation pipeline constructs, written in TypeScript.

## About

This library provides & uses three key building blocks for reactive
programming:

- **Stream sources**: event targets, iterables, timers, promises,
watches, workers, CSP channels, custom...
- **Subscriptions**: chained stream processors, each subscribable
(one-to-many) itself
- **Transducers**: stream transformers, either as individual
subscription or to transform values for a single subscription. See
[@thi.ng/transducers](https://github.com/thi-ng/umbrella/tree/master/packages/transducers)
for 90+ composable operators.
- **Recursive teardown**: Whenever possible, any unsubscription
initiates cleanup and propagates to parent(s).

Using these building blocks, a growing number of high-level operations
are provided too:

### Stream creation helpers

- [fromAtom()](https://github.com/thi-ng/umbrella/tree/master/packages/rstream/src/from/atom.ts) - streams from value changes in atoms/cursors
- [fromChannel()](https://github.com/thi-ng/umbrella/tree/master/packages/rstream-csp) - CSP channel to stream conversion
- [fromEvent()](https://github.com/thi-ng/umbrella/tree/master/packages/rstream/src/from/event.ts) - DOM events
- [fromInterval()](https://github.com/thi-ng/umbrella/tree/master/packages/rstream/src/from/interval.ts) - interval based counters
- [fromIterable()](https://github.com/thi-ng/umbrella/tree/master/packages/rstream/src/from/iterable.ts) - arrays, iterators / generators
- [fromPromise()](https://github.com/thi-ng/umbrella/tree/master/packages/rstream/src/from/promise.ts) - single value stream from promis
- [fromPromises()](https://github.com/thi-ng/umbrella/tree/master/packages/rstream/src/from/promises.ts) - results from multiple promise
- [fromRAF()](https://github.com/thi-ng/umbrella/tree/master/packages/rstream/src/from/raf.ts) - requestAnimationFrame() counter (w/ node fallback)
- [fromView()](https://github.com/thi-ng/umbrella/tree/master/packages/rstream/src/from/view.ts) - derived view changes (see @thi.ng/atom)
- [fromWorker()](https://github.com/thi-ng/umbrella/tree/master/packages/rstream/src/from/worker.ts) - messages received from worker
- [manual / custom](https://github.com/thi-ng/umbrella/tree/master/packages/rstream/src/stream.ts) - anything else

# About
### Stream merging

- [merge](https://github.com/thi-ng/umbrella/tree/master/packages/rstream/src/stream-merge.ts) - unsorted merge from multiple inputs (dynamic add/remove)
- [sync](https://github.com/thi-ng/umbrella/tree/master/packages/rstream/src/stream-sync.ts) - synchronized merge and labeled tuple objects

This library provides & uses three key building blocks for reactive programming:
### Useful subscription ops

- **Stream sources**: event targets, iterables, timers, promises, watches, workers, CSP channels, custom...
- **Subscriptions**: chained stream processors, each subscribable itself
- **Transducers**: stream transformers, individually or as part of a single subscription, see [@thi.ng/transducers](https://github.com/thi-ng/umbrella/tree/master/packages/transducers).
- [bisect](https://github.com/thi-ng/umbrella/tree/master/packages/rstream/src/subs/bisect.ts) - split via predicate
- [postWorker](https://github.com/thi-ng/umbrella/tree/master/packages/rstream/src/subs/post-worker.ts) - send values to workers (incl. optional worker instantiation)
- [resolve](https://github.com/thi-ng/umbrella/tree/master/packages/rstream/src/subs/resolve.ts) - resolve on-stream promises
- [sidechainPartition](https://github.com/thi-ng/umbrella/tree/master/packages/rstream/src/subs/sidechain-partition.ts) - emits chunks from source, controlled by sidechain stream
- [sidechainToggle](https://github.com/thi-ng/umbrella/tree/master/packages/rstream/src/subs/sidechain-toggle.ts) - toggles source based on signals from sidechain
- [trace](https://github.com/thi-ng/umbrella/tree/master/packages/rstream/src/subs/trace.ts) - debug helper
- [transduce](https://github.com/thi-ng/umbrella/tree/master/packages/rstream/src/subs/transduce.ts) - transduce or just reduce an entire stream into a promise

Using these building blocks, a growing number of high-level operations are provided too:
### Miscellaneous

- stream merging
- pubsub
- sidechain partitioning (emits chunks from source, controlled by sidechain stream)
- sidechain toggle (toggles source based on signals from sidechain)
- Subscriptions implement @thi.ng/api's `IDeref` interface and therefore
can be used directly in UI components based on
[@thi.ng/hdom](https://github.com/thi-ng/umbrella/tree/master/packages/hdom).

Furthermore, the
[@thi.ng/rstream-log](https://github.com/thi-ng/umbrella/tree/master/packages/rstream-log)
package provides an extensible multi-level, multi-target logging solution based
on this library.

TODO
package provides an extensible multi-level, multi-target logging
solution based on this library.

## Conceptual differences to RxJS

(No value judgements implied - there's room for both approaches!)

- Streams are not the same as Observables: I.e. stream sources are NOT
(often just cannot) re-run for each new sub added. Only the first sub
is guaranteed to receive **all** values. Subs added at a later time
MIGHT not receive earlier emitted values, but only the most recent
emitted and any future values)
- Every subscription supports any number of subscribers, which can be
added/removed at any time
- Every unsubscription recursively triggers upstream unsubscriptions
(provided a parent has no other active child subscriptions)
- Every subscription can have its own transducer transforming
incoming values (possibly into multiple new ones)
- Transducers can create streams themselves (only for `merge()` /
`sync()`)
- Transducers can cause early stream termination and subsequent unwinding
- Values can be manually injected into the stream pipeline / graph at
any point
- Every Stream also is a subscription
- Unhandled errors in subscriptions will move subscription into error
state and cause unsubscription from parent (if any). Unhandled errors
in stream sources will cancel the stream.
- *Much* smaller API surface since most common & custom operations can
be solved via available transducers. Therefore less need to provide
specialized functions (map / filter etc.) and more flexibility in
terms of composing new operations.
- IMHO less confusing naming / terminology (only streams (producers) &
subscriptions (consumers))

## Installation

Expand Down Expand Up @@ -92,6 +160,76 @@ new rs.StreamMerge({
// ...
```

### Dataflow graph example

This example uses [synchronized stream merging](https://github.com/thi-ng/umbrella/tree/master/packages/rstream/src/stream-sync.ts#L19) to implement a dataflow graph whose leaf inputs (and their changes) are sourced from a central immutable [atom](https://github.com/thi-ng/umbrella/tree/master/packages/).

```typescript
import { Atom } from "@thi.ng/atom/atom";
import { map } from "@thi.ng/transducers";
import * as rs from "@thi.ng/rstream";

// create mutable/watchable container for graph inputs
const graph = new Atom<any>({
a1: { ports: { a: 1, b: 2 } },
a2: { ports: { b: 10 } },
a3: { ports: { c: 0 } },
});

// create a synchronized stream merge from given inputs
const adder = (src) =>
rs.sync({
src,
// define transducer for merged tuple objects
// summing all values in each tuple
// (i.e. the values from the input streams)
xform: map((ports) => {
let sum = 0;
for (let p in ports) {
sum += ports[p];
}
return sum;
}),
// reset=false will only synchronize *all* inputs for the
// very 1st merged tuple, then emit updated ones when *any*
// input has changed with other input values in the tuple
// remaining the same
reset: false
});

// define first dataflow node
// `fromView()` creates a stream of value changes
// for given value path in the above atom
const a1 = adder([
rs.fromView(graph, "a1.ports.a"),
rs.fromView(graph, "a1.ports.b"),
]);

// this node computes sum of:
// - prev node
// - view of a2.ports.b value in atom
// - for fun, another external stream (iterable)
const a2 = adder([
a1,
rs.fromView(graph, "a2.ports.b"),
rs.fromIterable([0, 1, 2]),
]);

// last node computes sum of the other 2 nodes
const a3 = adder([a1, a2]);

// add a console.log sub to see results
a3.subscribe(rs.trace("result:"));
// result: 16
// result: 17
// result: 18

// value update in atom triggers recomputation
// of impacted graph nodes (and only those!)
setTimeout(() => graph.resetIn("a2.ports.b", 100), 100);
// result: 108
```

### Central app state atom with reactive undo / redo

```typescript
Expand All @@ -109,8 +247,8 @@ const mode = new atom.Cursor(app, "ui.mode");
rs.fromAtom(theme).subscribe(rs.trace("theme:"));
// with transducer
rs.fromAtom(mode).subscribe(rs.trace("mode:"), tx.map(mode => mode ? "advanced" : "basic"));
// another one for an hitherto unknown value in app state
rs.fromAtom(new atom.Cursor(app, "session.user")).subscribe(rs.trace("user:"));
// another one for an hitherto unknown value in app state (via derived view)
rs.fromView(app, "session.user").subscribe(rs.trace("user:"));

// attach history only to `ui` branch
// undo/redo will not record/change other keys in the atom
Expand Down
16 changes: 9 additions & 7 deletions packages/rstream/src/from/atom.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@ import { ReadonlyAtom } from "@thi.ng/atom/api";
import { Stream } from "../stream";

/**
* Yields stream of value changes in given atom / cursor.
* Attaches watch to atom and checks for value changes with given `changed`
* predicate (`!==` by default). If the predicate returns truthy result,
* the atom change is emitted on the stream.
* If `emitFirst` is true (default), also emits atom's current value
* when first subscriber attaches to stream.
* Yields stream of value changes in given atom / cursor. Attaches watch
* to atom and checks for value changes with given `changed` predicate
* (`!==` by default). If the predicate returns truthy result, the new
* value is emitted on the stream. If `emitFirst` is true (default),
* also emits atom's current value when first subscriber attaches to
* stream.
*
* See: @thi.ng/atom
* See:
* - fromView()
* - @thi.ng/atom
*
* ```
* db = new Atom({a: 23, b: 88});
Expand Down
57 changes: 57 additions & 0 deletions packages/rstream/src/from/view.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import { ReadonlyAtom, ViewTransform } from "@thi.ng/atom/api";
import { View } from "@thi.ng/atom/view";
import { Stream } from "../stream";
import { Path } from "@thi.ng/paths";
import { Predicate2 } from "@thi.ng/api/api";

/**
* Similar to `fromAtom()`, but creates an eager derived view for a
* nested value in atom / cursor and yields stream of its value changes.
* Views are readonly versions of Cursors and more lightweight. The view
* checks for value changes with given `equiv` predicate
* (`@thi.ng/api/equiv` by default). If the predicate returns a falsy
* result, the new value is emitted on the stream. The first value
* emitted is always the (possibly transformed) current value at the
* stream's start time (i.e. when the first subscriber attaches).
*
* If the optional `tx` is given, the raw value is first passed to this
* transformer function and its result emitted on the stream.
*
* When the stream is cancelled the view is destroyed as well.
*
* See:
* - fromAtom()
* - @thi.ng/atom
*
* ```
* db = new Atom({a: 1, b: {c: 2}});
*
* fromView(db, "b.c", (x) => x != null ? x : "n/a").subscribe(trace("view:"))
* // view: 2
*
* db.swapIn("b.c", (x: number) => x + 1);
* // view: 3
*
* db.reset({a: 10});
* // view: n/a
* ```
*
* @param atom
* @param path
* @param tx
*/
export function fromView<T>(atom: ReadonlyAtom<any>, path: Path, tx?: ViewTransform<T>, equiv?: Predicate2<any>): Stream<T> {
return new Stream<T>((stream) => {
let isActive = true;
const view = new View<T>(
atom,
path,
tx ?
(x) => isActive && (x = tx(x), stream.next(x), x) :
(x) => isActive && (stream.next(x), x),
false,
equiv
);
return () => (isActive = false, view.release());
});
}
3 changes: 3 additions & 0 deletions packages/rstream/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
export * from "./api";
export * from "./stream";
export * from "./stream-merge";
export * from "./stream-sync";
export * from "./subscription";

export * from "./from/atom";
Expand All @@ -10,6 +11,7 @@ export * from "./from/iterable";
export * from "./from/promise";
export * from "./from/promises";
export * from "./from/raf";
export * from "./from/view";
export * from "./from/worker";

export * from "./subs/bisect";
Expand All @@ -19,5 +21,6 @@ export * from "./subs/resolve";
export * from "./subs/sidechain-partition";
export * from "./subs/sidechain-toggle";
export * from "./subs/trace";
export * from "./subs/transduce";

export * from "./utils/worker";
Loading

0 comments on commit e58c5e0

Please sign in to comment.