Skip to content

Commit

Permalink
fix(rstream): update StreamSync.remove(), refactor ctor
Browse files Browse the repository at this point in the history
- remove ID from invRealSourceIDs
- update ctor xform init, avoid one level of comp() if opts.xform given
  • Loading branch information
postspectacular committed Aug 22, 2019
1 parent 5214f9a commit d5fd4b4
Showing 1 changed file with 15 additions and 23 deletions.
38 changes: 15 additions & 23 deletions packages/rstream/src/stream-sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ import {
comp,
labeled,
mapVals,
partitionSync,
Transducer
partitionSync
} from "@thi.ng/transducers";
import {
CloseMode,
Expand Down Expand Up @@ -121,34 +120,29 @@ export class StreamSync<A, B> extends Subscription<A, B> {
sourceIDs: Set<string>;

constructor(opts: Partial<StreamSyncOpts<A, B>>) {
let srcIDs = new Set<string>();
let xform: Transducer<[string, A], any> = comp(
partitionSync(srcIDs, {
key: (x) => x[0],
mergeOnly: opts.mergeOnly === true,
reset: opts.reset === true,
all: opts.all !== false
}),
mapVals((x) => x[1])
);
if (opts.xform) {
xform = comp(xform, opts.xform);
}
const srcIDs = new Set<string>();
const psync = partitionSync<[string, A]>(srcIDs, {
key: (x) => x[0],
mergeOnly: opts.mergeOnly === true,
reset: opts.reset === true,
all: opts.all !== false
});
const mapv = mapVals((x: [string, A]) => x[1]);
super(
undefined,
optsWithID("streamsync", <Partial<StreamSyncOpts<any, any>>>{
...opts,
xform
xform: opts.xform
? comp(psync, mapv, opts.xform)
: comp(psync, mapv)
})
);
this.sources = new Map();
this.realSourceIDs = new Map();
this.invRealSourceIDs = new Map();
this.idSources = new Map();
this.sourceIDs = srcIDs;
if (opts.src) {
this.addAll(opts.src);
}
opts.src && this.addAll(opts.src);
}

add(src: ISubscribable<A>, id?: string) {
Expand Down Expand Up @@ -205,6 +199,7 @@ export class StreamSync<A, B> extends Subscription<A, B> {
LOGGER.info(`removing src: ${src.id} (${id})`);
this.sourceIDs.delete(id);
this.realSourceIDs.delete(id);
this.invRealSourceIDs.delete(src.id);
this.idSources.delete(src.id);
this.sources.delete(src);
sub.unsubscribe();
Expand All @@ -215,10 +210,7 @@ export class StreamSync<A, B> extends Subscription<A, B> {

removeID(id: string) {
const src = this.getSourceForID(id);
if (src) {
return this.remove(src);
}
return false;
return src ? this.remove(src) : false;
}

removeAll(src: ISubscribable<A>[]) {
Expand Down

0 comments on commit d5fd4b4

Please sign in to comment.