Skip to content

Commit

Permalink
feat(rstream): add syncRAF()
Browse files Browse the repository at this point in the history
- add SyncRAF class and syncRAF() factory
- deprecate sidechainPartitionRAF()
- add tests
  • Loading branch information
postspectacular committed Mar 31, 2023
1 parent 2bf4094 commit 3c17520
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 0 deletions.
3 changes: 3 additions & 0 deletions packages/rstream/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,9 @@
"./subscription": {
"default": "./subscription.js"
},
"./sync-raf": {
"default": "./sync-raf.js"
},
"./sync": {
"default": "./sync.js"
},
Expand Down
1 change: 1 addition & 0 deletions packages/rstream/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ export * from "./sidechain-toggle.js";
export * from "./stream.js";
export * from "./subscription.js";
export * from "./sync.js";
export * from "./sync-raf.js";
export * from "./timeout.js";
export * from "./trace.js";
export * from "./transduce.js";
Expand Down
2 changes: 2 additions & 0 deletions packages/rstream/src/sidechain-partition.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ export const sidechainPartition = <A, B>(
* ```
*
* @param src -
*
* @deprecated use {@link syncRAF} instead
*/
export const sidechainPartitionRAF = <T>(src: ISubscribable<T>) =>
src
Expand Down
76 changes: 76 additions & 0 deletions packages/rstream/src/sync-raf.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import { isNode } from "@thi.ng/checks/is-node";
import { State, type ISubscribable, type CommonOpts } from "./api.js";
import { Subscription } from "./subscription.js";

export class SyncRAF<T> extends Subscription<T, T> {
queued?: T;
raf?: number | NodeJS.Timeout;

constructor(opts?: Partial<CommonOpts>) {
super(undefined, opts);
}

next(x: T) {
if (this.state >= State.DONE) return;
this.queued = x;
if (!this.raf) {
const update = () => {
if (this.state < State.DONE) super.next(this.queued!);
this._clean();
};
this.raf = isNode()
? setTimeout(update, 16)
: requestAnimationFrame(update);
}
}

done() {
this._clean();
super.done();
}

error(e: any) {
this._clean();
return super.error(e);
}

protected _clean() {
if (this.raf) {
isNode()
? clearTimeout(this.raf)
: cancelAnimationFrame(<number>this.raf);
}
this.raf = this.queued = undefined;
}
}

/**
* Similar to (in in effect the same as the now deprecated)
* {@link sidechainPartitionRAF}, however more performant & lightweight.
* Synchronizes downstream processing w/ `requestAnimationFrame()`. The returned
* subscription delays & debounces any high frequency intra-frame input values
* and passes only most recent one downstream during next RAF event processing.
*
* This example uses thi.ng/atom as state container. Also see {@link fromAtom}.
*
* @example
* ```ts
* const atom = defAtom("alice");
*
* // any changes to the atom will only be received by this subscription
* // during next RAF update cycle
* syncRAF(fromAtom(atom)).subscribe({
* next({ name }) { document.body.innerText = name; }
* });
*
* // trigger update
* atom.reset("bob");
* ```
*
* @param src -
* @param opts -
*/
export const syncRAF = <T>(
parent: ISubscribable<T>,
opts?: Partial<CommonOpts>
) => parent.subscribe(new SyncRAF<T>(opts));
33 changes: 33 additions & 0 deletions packages/rstream/test/sync-raf.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import { group } from "@thi.ng/testament";
import * as assert from "assert";
import { syncRAF, stream, State } from "../src/index.js";

group("syncRAF", {
basic: ({ done }) => {
const a = stream();
const a2 = syncRAF(a);
a.next(1);
assert.strictEqual(a.deref(), 1);
assert.strictEqual(a2.deref(), undefined);
setTimeout(() => {
assert.strictEqual(a.deref(), 2);
assert.strictEqual(a2.deref(), 2);
done();
}, 20);
a.next(2);
},

"early done": ({ done }) => {
const a = stream();
const a2 = syncRAF(a);
a.next(1);
a.done();
setTimeout(() => {
assert.strictEqual(a.getState(), State.UNSUBSCRIBED);
assert.strictEqual(a.deref(), undefined);
assert.strictEqual(a2.getState(), State.UNSUBSCRIBED);
assert.strictEqual(a2.deref(), undefined);
done();
}, 20);
},
});

0 comments on commit 3c17520

Please sign in to comment.