This repository contains a streams package. Streams is a very simple abstraction that combines functional programming, arbitrary data sync and reactivity.
A primary goal of the streams approach is extreme simplicity which allows data-synchronization to be almost transparent.
A stream is a logical value that changes over time. In keeping with
the immutable feel, the value itelf is not mutated but instead a
pointer to the next
version is maintained. The wrap
function
described below creates streams out of raw values as needed.
A key aspect of the implementation here is that when multiple clients connect to the same logical stream, the independent changes converge to the same value.
NOTE: This is an implementation of FRP, using the *pull model rather than the push model (though the same API exposed here can also effectively be implemented with a push model). One major difference with most FRP systems is that this pacakge implements convergence when used in a distributed setting. This is done using Operational Transformation.
- Documentation
- Roadmap
A stream can be created by wrapping any object:
// import {expect} from "./expect.js";
// import {wrap} from "github.com/dotchain/streams/es6";
let s1 = wrap("hello");
expect(s1.valueOf()).to.equal("hello");
For most practical purposes, the wrapped object works like the original object:
// import {expect} from "./expect.js";
// import {wrap} from "github.com/dotchain/streams/es6";
let s1 = wrap("hello");
expect(s1 + " world").to.equal("hello world");
NOTE: Most applications need network synchronized streams and do not typically create adhoc streams as in these examples.
The wrapping works transparently for hashes as well.
// import {expect} from "./expect.js";
// import {wrap} from "github.com/dotchain/streams/es6";
let s1 = wrap({hello: {boo: "hoo"}});
expect(s1.hello.boo + "t").to.equal("hoot");
All wrapped objects allow being replaced with another value:
// import {expect} from "./expect.js";
// import {wrap} from "github.com/dotchain/streams/es6";
let s1 = wrap("hello");
let s2 = s1.replace("world");
expect("hello " + s2).to.equal("hello world");
Note that replace returns a new value leaving the original as is. But all older versions of the object can obtain the latest value by calling latest:
// import {expect} from "./expect.js";
// import {wrap} from "github.com/dotchain/streams/es6";
let s1 = wrap("hello");
let s2 = s1.replace("world");
expect("" + s1.latest()).to.equal("" + s2);
Fields can be modified via the dot notation:
// import {expect} from "./expect.js";
// import {wrap} from "github.com/dotchain/streams/es6";
let s1 = wrap({hello: {world: "hey"}})
let s2 = s1.hello.world.replace("world");
expect(s2.valueOf()).to.equal("world");
expect(s1.latest().hello.world.valueOf()).to.equal("world");
The replacePath
method is like replace
except it takes a path
instead of using the dot notation:
// import {expect} from "./expect.js";
// import {wrap} from "github.com/dotchain/streams/es6";
let s1 = wrap({hello: {world: "hey"}})
let s2 = s1.replacePath(["hello", "world"], "boo");
expect(s1.latest().hello.world.valueOf()).to.equal("boo");
expect(s2.hello.world.valueOf()).to.equal("boo");
This is also a convenient way to set an inner field if that path doesn't exist:
// import {expect} from "./expect.js";
// import {wrap} from "github.com/dotchain/streams/es6";
let s1 = wrap({hello: "world"})
let s2 = s1.replacePath(["boo", "hoo"], "hoo");
expect(s2.boo.hoo.valueOf()).to.equal("hoo");
expect(s1.latest().boo.hoo.valueOf()).to.equal("hoo");
A field that does not exist can also be fetched using get which
maps to a wrapped null
value, but implements replace
for
convenience:
// import {expect} from "./expect.js";
// import {wrap} from "github.com/dotchain/streams/es6";
let s1 = wrap({hello: "world"})
let inner = s1.get("boo").get("hoo");
expect(inner.valueOf()).to.equal(null);
s1.get("boo").get("hoo").replace("hoot");
expect(s1.latest().boo.hoo.valueOf()).to.equal("hoot");
expect(inner.latest().valueOf()).to.equal("hoot");
Fields can be deleted by replacing with null:
// import {expect} from "./expect.js";
// import {wrap} from "github.com/dotchain/streams/es6";
let s1 = wrap({hello: "world"})
s1.hello.replace(null);
expect(s1.exists("hello")).to.equal(true);
expect(s1.latest().exists("hello")).to.equal(false);
expect(JSON.stringify(s1.latest())).to.equal("{}");
Mutations automatically converge on a stream. Note that using a stream that is synchronized would mean that this convergence happens across network clients: i.e. all clients with this state automatically converge:
// import {expect} from "./expect.js";
// import {wrap} from "github.com/dotchain/streams/es6";
let s = wrap({hello: "world", boo: "hoo"});
// edit hello and boo separately on top of s
s.hello.replace("World");
s.boo.replace("Hoo");
// now s.latest() would have the updated values of both
s = s.latest();
expect(s.hello.valueOf()).to.equal("World");
expect(s.boo.valueOf()).to.equal("Hoo");
When multiple mutations conflict, the last writer generally wins.
// import {expect} from "./expect.js";
// import {wrap} from "github.com/dotchain/streams/es6";
let s = wrap({});
// edit hello and boo separately on top of s
s.get("hello").replace("World");
s.get("hello").replace("Goodbye");
// now s.latest() would have the updated values of both
s = s.latest();
expect(s.hello.valueOf()).to.equal("Goodbye");
Streams can be combined to create more streams using the merge
,
object
and watch
functions as well as the collection functions
like map
and filter
.
Two separate object streams can be combined with merge
:
// import {expect} from "./expect.js";
// import {wrap} from "github.com/dotchain/streams/es6";
// import {merge} from "github.com/dotchain/streams/es6";
let s1 = wrap({hello: "world"});
let s2 = wrap({boo: "hoo"});
let s3 = merge([s1, s2]);
s2.boo.replace("hoot");
expect(s3.latest().boo.valueOf()).to.equal("hoot");
expect(s3.latest().hello.valueOf()).to.equal("world");
When the same key is present in multiple streams, the last one wins:
// import {expect} from "./expect.js";
// import {wrap} from "github.com/dotchain/streams/es6";
// import {merge} from "github.com/dotchain/streams/es6";
let s1 = wrap({hello: "world", ok: "computer"});
let s2 = wrap({hello: "goodbye", boo: "hoo"});
let s3 = merge([s1, s2]);
expect(s3.hello.valueOf()).to.equal("goodbye");
Modifying a key (or some path) correctly transfers those mutations to the underlying streams.
// import {expect} from "./expect.js";
// import {wrap} from "github.com/dotchain/streams/es6";
// import {merge} from "github.com/dotchain/streams/es6";
let s1 = wrap({hello: "world"});
let s2 = wrap({boo: "hoo"});
let s3 = merge([s1, s2]);
s3.boo.replace("hoot");
expect(s2.latest().boo.valueOf()).to.equal("hoot");
When streams are merged, new keys always end up being added on the last stream.
// import {expect} from "./expect.js";
// import {wrap} from "github.com/dotchain/streams/es6";
// import {merge} from "github.com/dotchain/streams/es6";
let s1 = wrap({hello: "world"});
let s2 = wrap({boo: "hoo"});
let s3 = merge([s1, s2]);
s3.get("la la").replace("la di da");
expect(s2.latest()["la la"].valueOf()).to.equal("la di da");
Deleting a key from a merged stream correctly deletes the right underlying stream:
// import {expect} from "./expect.js";
// import {wrap} from "github.com/dotchain/streams/es6";
// import {merge} from "github.com/dotchain/streams/es6";
let s1 = wrap({hello: "world"});
let s2 = wrap({boo: "hoo"});
let s3 = merge([s1, s2]);
s3.hello.replace(null);
expect(JSON.stringify(s1.latest())).to.equal("{}");
s3 = s3.latest();
expect(JSON.stringify(s3)).to.equal('{"boo":"hoo"}');
Deleting a key from a merged stream may surface an older key:
// import {expect} from "./expect.js";
// import {wrap} from "github.com/dotchain/streams/es6";
// import {merge} from "github.com/dotchain/streams/es6";
let s1 = wrap({hello: "world", ok: "computer"});
let s2 = wrap({boo: "hoo", ok: "not a computer"});
let s3 = merge([s1, s2]);
expect(s3.ok.valueOf()).to.equal("not a computer");
s3.ok.replace(null);
s3 = s3.latest();
expect(s3.ok.valueOf()).to.equal("computer");
Streams can also be combined to form static shapes using object
:
// import {expect} from "./expect.js";
// import {wrap} from "github.com/dotchain/streams/es6";
// import {object} from "github.com/dotchain/streams/es6";
let name1 = wrap("Joe");
let name2 = wrap("Shmoe");
let name = object({first: name1, last: name2});
name1.replace("John");
name2.replace("Doe");
expect(name.latest().first.valueOf()).to.equal("John");
expect(name.latest().last.valueOf()).to.equal("Doe");
const expected = {first: "John", last: "Doe"};
expect(JSON.stringify(name.latest())).to.equal(JSON.stringify(expected));
Static shapes do not allow adding or removing keys but modifying a key (or some path) correctly transfers those mutations to the underlying streams:
// import {expect} from "./expect.js";
// import {wrap} from "github.com/dotchain/streams/es6";
// import {object} from "github.com/dotchain/streams/es6";
let s1 = wrap("world");
let s2 = object({hello: s1});
s2.hello.replace("goodbye");
expect(s1.latest().valueOf()).to.equal("goodbye");
expect(s2.latest().hello.valueOf()).to.equal("goodbye");
The watch
function can be used to apply a specific function to every
instance. It acts a bit like map
acts on collections:
// import {expect} from "./expect.js";
// import {wrap} from "github.com/dotchain/streams/es6";
// import {watch} from "github.com/dotchain/streams/es6";
let name = wrap({first: "joe", last: "schmoe"});
let fullName = watch(name, name => name.first + " " + name.last);
expect(fullName.valueOf()).to.equal("joe schmoe");
name.first.replace("John");
name.latest().last.replace("Doe");
expect(fullName.latest().valueOf()).to.equal("John Doe");
The function passed to watch
can produce a stream. This is useful
when joining two collections:
// import {expect} from "./expect.js";
// import {wrap} from "github.com/dotchain/streams/es6";
// import {watch} from "github.com/dotchain/streams/es6";
let mappings = wrap({Richard: "Dick", Charles: "Chuck"})
let name = wrap({first: "Richard", last: "Feynman"})
let nick = watch(name, name => mappings[name.first.valueOf()]);
expect(nick.valueOf()).to.equal("Dick");
mappings.Richard.replace("Rick");
expect(nick.latest().valueOf()).to.equal("Rick");
name.first.replace("Charles");
expect(nick.latest().valueOf()).to.equal("Chuck");
Collections are represented as object hashes. The standard collection
methods implemented for hashes are: map
, filter
, groupBy
,
order
, orderBy
and filter
.
All of these functions take a function as a parameter which is
called on each item in the collection like so: fn(value, key)
.
The map
function calls the provided callback for each key and
returns a hash that has the values replaced with that of the provided
function.
// import {expect} from "./expect.js";
// import {wrap} from "github.com/dotchain/streams/es6";
// import {map} from "github.com/dotchain/streams/es6";
// uppercase converts a string stream into upper case string stream
let uppercase = name => name.valueOf().toUpperCase();
let name = wrap({first: "joe", last: "schmoe"});
let mapped = map(name, uppercase);
expect(mapped.first.valueOf()).to.equal("JOE");
expect(mapped.last.valueOf()).to.equal("SCHMOE");
Note: the return value of map does not support being mutated.
The orderBy
function is useful for sorting collections. A sorted
collection guarantees that the forEachKey
iteration will visit in
the order specified:
// import {expect} from "./expect.js";
// import {wrap} from "github.com/dotchain/streams/es6";
// import {orderBy} from "github.com/dotchain/streams/es6";
const list = wrap({one: {x: 2}, two: {x: 1}});
const sorted = orderBy(list, (val, _key) => val.x);
let keys = [];
sorted.forEachKey(key => { keys.push(key) });
expect(JSON.stringify(keys)).to.equal(`["two","one"]`);
// updates remain sorted
list.one.x.replace(-1);
keys = [];
sorted.latest().forEachKey(key => { keys.push(key) });
expect(JSON.stringify(keys)).to.equal(`["one","two"]`);
Sorted collections are otherwise like regular collections (including the ability to mutate items etc).
The order
function is a more elaborate version of orderBy
-- the
callback is used as a comparison function:
- The signature of callback is
fn(val1, val2, key1, key2)
- The return value is negative if val1 comes before val2.
- The return value is positive if val1 comes after val2.
- The return value is zero if the two values are identical.
// import {expect} from "./expect.js";
// import {wrap} from "github.com/dotchain/streams/es6";
// import {order} from "github.com/dotchain/streams/es6";
const list = wrap({one: {x: 2}, two: {x: 1}});
const sorted = order(list, (val1, val2, _1, _2) => val1.x - val2.x);
let keys = [];
sorted.forEachKey(key => { keys.push(key) });
expect(JSON.stringify(keys)).to.equal(`["two","one"]`);
// updates remain sorted
list.one.x.replace(-1);
keys = [];
sorted.latest().forEachKey(key => { keys.push(key) });
expect(JSON.stringify(keys)).to.equal(`["one","two"]`);
The filter
function returns a subset of the collection that matches
the provided collection:
// import {expect} from "./expect.js";
// import {wrap} from "github.com/dotchain/streams/es6";
// import {filter} from "github.com/dotchain/streams/es6";
const list = wrap({one: {x: 2}, two: {x: -1}});
const filtered = filter(list, (val, _key) => val.x > 0);
expect(JSON.stringify(filtered)).to.equal(`{"one":{"x":2}}`);
// updates work
list.two.x.replace(5);
expect(filtered.latest().get('two').get('x').valueOf()).to.equal(5);
Note that filtered values can be edited.
The groupBy
function groups values:
// import {expect} from "./expect.js";
// import {wrap} from "github.com/dotchain/streams/es6";
// import {groupBy} from "github.com/dotchain/streams/es6";
const row1 = {x: 5, y: 23};
const row2 = {x: 5, y: 11};
const row3 = {x: 9, y: 6};
const table = wrap({row1, row2, row3});
let grouped = groupBy(table, row => row.x);
let g = {'5': {row1, row2}, '9': {row3}};
expect(JSON.parse(JSON.stringify(grouped))).to.deep.equal(g);
// updates work
table.row2.x.replace(7);
grouped = grouped.latest();
g = {'5': {row1}, '9': {row3}, '7': {row2: {x: 7, y: 11}}};
expect(JSON.parse(JSON.stringify(grouped))).to.deep.equal(g);
A standalone server is needed for clients to connect to. The following is an example of such a server which uses a file storage mechanism (and properly serializes access to the file):
// import http from "http";
// import fs from "fs";
// import {serve} from "github.com/dotchain/streams/es6";
// import {FileStore} from "github.com/dotchain/streams/es6";
// import {transformStore} from "github.com/dotchain/streams/es6";
func startServer() {
let store = new FileStore("/tmp/ops.json", fs);
let xstore = transformStore(store);
let server = http.createServer((req, res) => serve(xstore, req, res));
server.listen(8042);
return server;
}
The following example illustrates a browser setup. Note that this example includes an embedded server but that's just there to make the code testable.
The example also includes a fake local storage implementation -- a
browser set-up can just use window.localStorage
// import http from "http";
// import fs from "fs";
// import fetch from "node-fetch";
// import {serve} from "github.com/dotchain/streams/es6";
// import {urlTransport} from "github.com/dotchain/streams/es6";
// import {transformStore} from "github.com/dotchain/streams/es6";
// import {sync} from "github.com/dotchain/streams/es6";
// import {FileStore} from "github.com/dotchain/streams/es6";
// import {Cache} from "github.com/dotchain/streams/es6";
let server = startServer();
let {root, xport} = startClient();
// update root
root.replace("hello");
expect(root.latest() + "").to.equal("hello");
// push the changes to the server
await xport.push();
// check that these are visible on another client
let {root: root2, xport: xport2} = startClient();
await xport2.pull();
expect(root2.latest() + "").to.equal("hello");
// cleanup
server.close();
fs.unlinkSync("/tmp/ops.json");
function startClient() {
let xport = urlTransport("http://localhost:8042/", fetch);
let ls = fakeLocalStorage(); // window.localStorage on browsers
let root = sync(new Cache(ls), xport, newID());
return {root, xport};
}
function newID() {
let count = 0;
return () => {
count ++;
return `${count}`;
}
}
function fakeLocalStorage() {
let storage = {};
return {
setItem: (key, value) => { storage[key] = value + ""; },
getItem: (key) => storage[key]
}
}
function startServer() {
let store = new FileStore("/tmp/ops.json", fs);
let xstore = transformStore(store);
let server = http.createServer((req, res) => serve(xstore, req, res));
server.listen(8042);
return server;
}
The browser example above can be adapted to a node-js client setup very directly but a different use case is where the server uses streams connected to a local file storage directly (maybe exposed via REST api endpoints).
// import fs from "fs";
// import {serve} from "github.com/dotchain/streams/es6";
// import {Transport} from "github.com/dotchain/streams/es6";
// import {sync} from "github.com/dotchain/streams/es6";
// import {FileStore} from "github.com/dotchain/streams/es6";
// import {Cache} from "github.com/dotchain/streams/es6";
let {root, xport} = startClient();
// update root
root.replace("hello");
expect(root.latest() + "").to.equal("hello");
// push the changes to the server
await xport.push();
// check that these are visible on another client
let {root: root2, xport: xport2} = startClient();
await xport2.pull();
expect(root2.latest() + "").to.equal("hello");
// cleanup
fs.unlinkSync("/tmp/ops.json");
function startClient() {
let xport = new Transport(new FileStore("/tmp/ops.json", fs));
let ls = fakeLocalStorage(); // window.localStorage on browsers
let root = sync(new Cache(ls), xport, newID());
return {root, xport};
}
function newID() {
let count = 0;
return () => {
count ++;
return `${count}`;
}
}
function fakeLocalStorage() {
let storage = {};
return {
setItem: (key, value) => { storage[key] = value + ""; },
getItem: (key) => storage[key]
}
}
Other standard types like number, boolean, Date or null can be wrapped as well. When wrapped, these do implement the valueOf method, so callers can use that to unwrap the values.
Date is a little special in that the wrapped value gets serialized
to {type: "date", value: <utc_milliseconds>}
instead of a human
readable ISO string. Unwrap
returns this value too (though
valueOf
returns the native Date object).
Minimal E2E implementation:streams.wrap for string type onlyOnly method supported by string type is replace()Only change type is ReplaceSimple transport (no merge/rebasing)sync() implementationIn memory server
Server persistence to filesLocal session state cachingMore atomic types (bool, number, date)Dict typestreams.wrap supportPathChange change typefields accessible using dot notation
Collectionsmap, order, orderBy, filter, groupBy
Compositionwatch, object, merge
Collaborationmerge support in change typesmerge support in stream base classmerge support in streams.sync()transformed operations- multiple tabs support
- add merge tests
- Branch merge support
- Server support
- snapshots
- db storage
- expose writeable streams via store API
Mutable collections supportorder, orderBy, filter, groupBy
Mutable composition supportobjectmerge