Skip to content

Commit

Permalink
feat(rstream): update StreamSync to use reset: false by default, up…
Browse files Browse the repository at this point in the history
…date docs
  • Loading branch information
postspectacular committed Aug 3, 2018
1 parent 2f0235e commit 55499cc
Showing 1 changed file with 36 additions and 13 deletions.
49 changes: 36 additions & 13 deletions packages/rstream/src/stream-sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,42 @@ import { ISubscribable, State } from "./api";
import { Subscription } from "./subscription";

export interface StreamSyncOpts<A, B> extends IID<string> {
/**
* Either an array or object of input streams / subscribables. If
* the latter, the object keys are used to label the inputs, else
* their `id` is used as label.
*/
src: ISubscribable<A>[] | IObjectOf<ISubscribable<A>>;
/**
* Optional transducer applied to the synced result tuple objects.
*/
xform: Transducer<IObjectOf<A>, B>;
/**
* If true, StreamSync waits for new values from *all*
* inputs before a new tuple is produced. If false (default),
* synchronization only happens for the very first tuple.
*/
reset: boolean;
/**
* By default, the last emitted tuple is allowed to be incomplete
* (in case all inputs closed). To only allow complete tuples, set
* the `all` to false.
*/
all: boolean;
/**
* If false, StreamSync stays active even if all inputs are done.
*/
close: boolean;
}


/**
* Similar to `StreamMerge`, but with extra synchronization of inputs.
* Before emitting any new values, `StreamSync` collects values until at
* least one has been received from *all* inputs. Once that's the case,
* the collected values are sent as labeled tuple object to downstream
* subscribers and the process repeats until all inputs are exhausted.
* Similar to `StreamMerge` (`merge()`), but with extra synchronization
* of inputs. Before emitting any new values, `StreamSync` collects
* values until at least one has been received from *all* inputs. Once
* that's the case, the collected values are sent as labeled tuple
* object to downstream subscribers and the process repeats until all
* inputs are exhausted. Any done inputs are automatically removed.
*
* In addition to the default mode of operation, i.e. waiting for new
* values from *all* inputs before another tuple is produced, the
Expand All @@ -35,12 +58,12 @@ export interface StreamSyncOpts<A, B> extends IID<string> {
* stream ID. Only the last value received from each input is passed on.
*
* ```
* sync = new StreamSync({src: [a=new Stream("a"), b=new Stream("b")]});
* sync.subscribe(trace());
* s = sync({src: [a=new Stream("a"), b=new Stream("b")]});
* s.subscribe(trace("result: "));
*
* a.next(1);
* b.next(2);
* // { a: 1, b: 2 }
* // result: { a: 1, b: 2 }
* ```
*
* Input streams can be added and removed dynamically and the emitted
Expand All @@ -57,6 +80,10 @@ export interface StreamSyncOpts<A, B> extends IID<string> {
* the @thi.ng/transducers package. See this function's docs for further
* details.
*/
export function sync<A, B>(opts: Partial<StreamSyncOpts<A, B>>) {
return new StreamSync(opts);
}

export class StreamSync<A, B> extends Subscription<A, B> {

/**
Expand Down Expand Up @@ -85,7 +112,7 @@ export class StreamSync<A, B> extends Subscription<A, B> {
constructor(opts: Partial<StreamSyncOpts<A, B>>) {
let srcIDs = new Set<string>();
let xform: Transducer<any, any> = comp(
partitionSync<A>(srcIDs, (x) => x[0], opts.reset !== false, opts.all !== false),
partitionSync<A>(srcIDs, (x) => x[0], opts.reset === true, opts.all !== false),
mapVals((x) => x[1])
);
if (opts.xform) {
Expand Down Expand Up @@ -214,7 +241,3 @@ export class StreamSync<A, B> extends Subscription<A, B> {
}
}
}

export function sync<A, B>(opts: Partial<StreamSyncOpts<A, B>>) {
return new StreamSync(opts);
}

0 comments on commit 55499cc

Please sign in to comment.