diff --git a/packages/rstream/src/forkjoin.ts b/packages/rstream/src/forkjoin.ts index a8fc403f98..bbd1589cb0 100644 --- a/packages/rstream/src/forkjoin.ts +++ b/packages/rstream/src/forkjoin.ts @@ -129,8 +129,8 @@ export const forkJoin = ( String(id), opts.src .transform(map((x: IN) => opts.fork(id, numWorkers, x))) - .subscribe( - tunnel({ + .subscribe( + tunnel({ src: opts.worker, transferables: opts.transferables, interrupt: opts.interrupt === true, diff --git a/packages/rstream/src/from/promise.ts b/packages/rstream/src/from/promise.ts index 61ff2a7a8b..4f0855e04e 100644 --- a/packages/rstream/src/from/promise.ts +++ b/packages/rstream/src/from/promise.ts @@ -1,4 +1,4 @@ -import { CloseMode, CommonOpts, State } from "../api"; +import { CloseMode, State, WithErrorHandlerOpts } from "../api"; import { Stream } from "../stream"; import { optsWithID } from "../utils/idgen"; @@ -13,7 +13,10 @@ import { optsWithID } from "../utils/idgen"; * @param src - * @param opts - */ -export const fromPromise = (src: Promise, opts?: Partial) => { +export const fromPromise = ( + src: Promise, + opts?: Partial +) => { let canceled = false; let isError = false; let err: any = {}; diff --git a/packages/rstream/src/from/promises.ts b/packages/rstream/src/from/promises.ts index e4c453d88c..e74c1c3568 100644 --- a/packages/rstream/src/from/promises.ts +++ b/packages/rstream/src/from/promises.ts @@ -1,5 +1,5 @@ import { mapcat } from "@thi.ng/transducers"; -import type { CommonOpts, ISubscription } from "../api"; +import type { ISubscription, WithErrorHandlerOpts } from "../api"; import { optsWithID } from "../utils/idgen"; import { fromPromise } from "./promise"; @@ -38,10 +38,11 @@ import { fromPromise } from "./promise"; * ``` * * @param promises - + * @param opts - */ export const fromPromises = ( promises: Iterable>, - opts?: Partial + opts?: Partial ): ISubscription => fromPromise(Promise.all(promises), optsWithID("promises", opts)).transform( mapcat((x: T[]) => x) diff --git a/packages/rstream/src/from/worker.ts b/packages/rstream/src/from/worker.ts index 09515acc0d..96e9b9ccb1 100644 --- a/packages/rstream/src/from/worker.ts +++ b/packages/rstream/src/from/worker.ts @@ -1,9 +1,9 @@ -import { CommonOpts, LOGGER } from "../api"; +import { LOGGER, WithErrorHandlerOpts } from "../api"; import { Stream } from "../stream"; import { optsWithID } from "../utils/idgen"; import { makeWorker } from "../utils/worker"; -export interface FromWorkerOpts extends CommonOpts { +export interface FromWorkerOpts extends WithErrorHandlerOpts { /** * If true, the worker will be terminated when the stream * is being closed. @@ -43,17 +43,17 @@ export const fromWorker = ( const _worker = makeWorker(worker); opts = optsWithID("worker", opts); return new Stream((stream) => { - const ml = (e: MessageEvent) => { + const msgListener = (e: MessageEvent) => { stream.next(e.data); }; - const el = (e: MessageEvent) => { + const errListener = (e: MessageEvent) => { stream.error(e.data); }; - _worker.addEventListener("message", ml); - _worker.addEventListener("error", el); + _worker.addEventListener("message", msgListener); + _worker.addEventListener("error", errListener); return () => { - _worker.removeEventListener("message", ml); - _worker.removeEventListener("error", el); + _worker.removeEventListener("message", msgListener); + _worker.removeEventListener("error", errListener); if (opts!.terminate !== false) { LOGGER.info("terminating worker", _worker); _worker.terminate(); diff --git a/packages/rstream/src/index.ts b/packages/rstream/src/index.ts index 246136b0de..49f299cb1a 100644 --- a/packages/rstream/src/index.ts +++ b/packages/rstream/src/index.ts @@ -6,7 +6,6 @@ export * from "./stream"; export * from "./stream-merge"; export * from "./stream-sync"; export * from "./subscription"; -export * from "./sub2"; export * from "./trigger"; export * from "./tween"; diff --git a/packages/rstream/src/metastream.ts b/packages/rstream/src/metastream.ts index 68bcbe73fe..c47f3131ad 100644 --- a/packages/rstream/src/metastream.ts +++ b/packages/rstream/src/metastream.ts @@ -160,7 +160,7 @@ export class MetaStream extends Subscription { } unsubscribe(sub?: Subscription) { - if (this.stream && (!sub || this.subs.length === 1)) { + if (this.stream && (!sub || this.subs.size === 1)) { this.detach(!sub); } return super.unsubscribe(sub); diff --git a/packages/rstream/src/stream-sync.ts b/packages/rstream/src/stream-sync.ts index f5ef8007f6..5cffefa1ff 100644 --- a/packages/rstream/src/stream-sync.ts +++ b/packages/rstream/src/stream-sync.ts @@ -6,13 +6,7 @@ import { partitionSync, PartitionSync, } from "@thi.ng/transducers"; -import { - ISubscribable, - ISubscription, - LOGGER, - State, - TransformableOpts, -} from "./api"; +import { ISubscribable, ISubscription, LOGGER, TransformableOpts } from "./api"; import { Subscription } from "./subscription"; import { isFirstOrLastInput } from "./utils/checks"; import { optsWithID } from "./utils/idgen"; @@ -271,10 +265,10 @@ export class StreamSync< unsubscribe(sub?: Subscription) { if (!sub) { + LOGGER.debug(this.id, "unsub sources"); for (let s of this.sources.values()) { s.unsubscribe(); } - this.state = State.DONE; this.sources.clear(); this.psync.clear(); this.realSourceIDs.clear(); diff --git a/packages/rstream/src/stream.ts b/packages/rstream/src/stream.ts index 78fe19d87a..ae5c9faf8d 100644 --- a/packages/rstream/src/stream.ts +++ b/packages/rstream/src/stream.ts @@ -4,10 +4,12 @@ import { CommonOpts, IStream, ISubscriber, + ISubscription, LOGGER, StreamCancel, StreamSource, TransformableOpts, + WithErrorHandlerOpts, } from "./api"; import { Subscription } from "./subscription"; import { optsWithID } from "./utils/idgen"; @@ -97,27 +99,32 @@ export class Stream extends Subscription implements IStream { protected _cancel: StreamCancel | undefined; protected _inited: boolean; - constructor(opts?: Partial); - constructor(src: StreamSource, opts?: Partial); + constructor(opts?: Partial); + constructor(src: StreamSource, opts?: Partial); constructor( - src?: StreamSource | Partial, - opts?: Partial + src?: StreamSource | Partial, + opts?: Partial ) { - const [_src, _opts] = isFunction(src) ? [src, opts] : [undefined, src]; - super(undefined, optsWithID("stream", _opts)); + const [_src, _opts] = isFunction(src) + ? [src, opts || {}] + : [undefined, src || {}]; + super( + _opts.error ? { error: _opts.error } : undefined, + optsWithID("stream", _opts) + ); this.src = _src; this._inited = false; } - subscribe(sub: Subscription): Subscription; + subscribe(sub: ISubscription): ISubscription; subscribe( sub: Partial>, opts?: Partial - ): Subscription; + ): ISubscription; subscribe( sub: Partial>, opts?: Partial> - ): Subscription; + ): ISubscription; subscribe( sub: Partial>, opts: Partial> = {} @@ -130,12 +137,12 @@ export class Stream extends Subscription implements IStream { return $sub; } - unsubscribe(sub?: Subscription) { + unsubscribe(sub?: ISubscription) { const res = super.unsubscribe(sub); if ( res && (!sub || - ((!this.subs || !this.subs.length) && + ((!this.subs || !this.subs.size) && this.closeOut !== CloseMode.NEVER)) ) { this.cancel(); @@ -151,7 +158,7 @@ export class Stream extends Subscription implements IStream { } error(e: any) { - super.error(e); + if (super.error(e)) return true; this.cancel(); return false; } diff --git a/packages/rstream/src/sub2.ts b/packages/rstream/src/sub2.ts deleted file mode 100644 index b85a83d13b..0000000000 --- a/packages/rstream/src/sub2.ts +++ /dev/null @@ -1,304 +0,0 @@ -import { assert, Fn, NULL_LOGGER, SEMAPHORE } from "@thi.ng/api"; -import { isPlainObject } from "@thi.ng/checks"; -import { - comp, - isReduced, - map, - peek, - push, - Reduced, - Reducer, - Transducer, - unreduced, -} from "@thi.ng/transducers"; -import { - CloseMode, - CommonOpts, - ISubscriber, - ISubscription, - LOGGER, - State, - SubscriptionOpts, - TransformableOpts, - WithErrorHandlerOpts, - WithTransform, -} from "./api"; -import type { Subscription } from "./subscription"; -import { optsWithID } from "./utils/idgen"; - -/** - * WIP implementation of new subscription type (will replace existing - * `Subscription` class) - */ -export class Sub2 implements ISubscription { - id: string; - closeIn: CloseMode; - closeOut: CloseMode; - parent?: ISubscription; - __owner?: ISubscription; - - protected xform?: Reducer; - protected cacheLast: boolean; - protected last: any = SEMAPHORE; - protected state = State.IDLE; - protected subs = new Set>>(); - - constructor( - protected wrapped?: Partial>, - opts?: Partial> - ) { - opts = optsWithID(`$sub`, { - closeIn: CloseMode.LAST, - closeOut: CloseMode.LAST, - cache: true, - ...opts, - }); - this.parent = opts.parent; - this.id = opts.id!; - this.closeIn = opts.closeIn!; - this.closeOut = opts.closeOut!; - this.cacheLast = opts.cache!; - opts.xform && (this.xform = opts.xform(push())); - } - - deref(): B | undefined { - return this.last !== SEMAPHORE ? this.last : undefined; - } - - getState(): State { - return this.state; - } - - subscribe(sub: Sub2): ISubscription; - subscribe( - sub: Partial>, - opts?: Partial - ): ISubscription; - subscribe( - sub: Partial>, - opts?: Partial> - ): ISubscription; - subscribe( - sub: Partial>, - opts: Partial> = {} - ): any { - let $sub: ISubscriber; - if (sub instanceof Sub2) { - assert(!!sub.parent, `sub '${sub.id}' already has a parent`); - sub.parent = this; - $sub = sub; - } else { - $sub = new Sub2(sub, { ...opts, parent: this }); - } - this.subs.add($sub); - this.last != SEMAPHORE && $sub.next(this.last); - return $sub; - } - - transform( - a: Transducer, - opts?: Partial - ): Subscription; - transform( - a: Transducer, - b: Transducer, - opts?: Partial - ): Subscription; - transform( - a: Transducer, - b: Transducer, - c: Transducer, - opts?: Partial - ): Subscription; - transform( - a: Transducer, - b: Transducer, - c: Transducer, - d: Transducer, - opts?: Partial - ): Subscription; - transform( - opts: WithTransform & Partial - ): Subscription; - transform(...args: any[]) { - let sub: Partial> | undefined; - let opts: Partial> | undefined; - if (isPlainObject(peek(args))) { - opts = args.pop(); - sub = { error: (opts).error }; - } - return this.subscribe( - sub, - optsWithID( - "xform", - args.length > 0 - ? { - ...opts!, - // @ts-ignore - xform: comp(...args), - } - : opts - ) - ); - } - - /** - * Syntax sugar for {@link Subscription.transform} when using a - * single {@link @thi.ng/transducers#map} transducer only. The given - * function `fn` is used as `map`'s transformation fn. - * - * @param fn - * @param opts - */ - map( - fn: Fn, - opts?: Partial - ): ISubscription { - return this.transform(map(fn), opts || {}); - } - - unsubscribe(sub?: Partial>) { - LOGGER.debug(this.id, "unsub start", sub ? sub.id : "self"); - if (!sub) { - this.parent && this.parent.unsubscribe(this); - this.state = State.UNSUBSCRIBED; - this.release(); - return true; - } - LOGGER.debug(this.id, "unsub child", sub.id); - if (this.subs.delete(sub)) { - if ( - this.closeOut === CloseMode.FIRST || - (!this.subs.size && this.closeOut !== CloseMode.NEVER) - ) { - this.unsubscribe(); - } - return true; - } - return false; - } - - next(x: A) { - if (this.state >= State.DONE) return; - this.xform ? this.dispatchXform(x) : this.dispatch(x); - } - - done() { - LOGGER.debug(this.id, "entering done()"); - if (this.state >= State.DONE) return; - if (this.xform) { - if (!this.dispatchXformDone()) return; - } - // attempt to call .done in wrapped sub - if (!this.dispatchTo("done")) return; - // disconnect from parent & internal cleanup - this.unsubscribe(); - this.state = State.DONE; - LOGGER.debug(this.id, "exiting done()"); - } - - error(e: any) { - // only the wrapped sub's error handler gets a chance - // to deal with the error - const sub = this.wrapped; - const hasErrorHandler = sub && sub.error; - hasErrorHandler && - LOGGER.debug(this.id, "attempting wrapped error handler"); - // flag success if error handler returns true - // (i.e. it could handle/recover from the error) - // else detach this entire sub by going into error state... - return (hasErrorHandler && sub!.error!(e)) || this.unhandledError(e); - } - - protected unhandledError(e: any) { - // ensure error is at least logged to console - // even if default NULL_LOGGER is used... - (LOGGER !== NULL_LOGGER ? LOGGER : console).warn( - this.id, - "unhandled error:", - e - ); - this.unsubscribe(); - this.state = State.ERROR; - return false; - } - - protected dispatchTo(type: "next" | "done", x?: B) { - let s: Partial> | undefined = this.wrapped; - if (s) { - try { - s[type] && s[type]!(x!); - } catch (e) { - // give wrapped sub a chance to handle error - // (if that failed then we're already in error state now & terminate) - if (!this.error(e)) return false; - } - } - // process other child subs - for (s of type === "next" ? this.subs : [...this.subs]) { - try { - s[type] && s[type]!(x!); - } catch (e) { - if (!s.error || !s.error(e)) { - // if no or failed handler, go into error state - return this.unhandledError(e); - } - } - } - return true; - } - - protected dispatch(x: B) { - LOGGER.debug(this.id, "dispatch", x); - this.cacheLast && (this.last = x); - this.dispatchTo("next", x); - } - - protected dispatchXform(x: A) { - let acc: B[] | Reduced; - try { - acc = this.xform![2]([], x); - } catch (e) { - // error in transducer can only be handled by the wrapped - // subscriber's error handler (if avail) - this.error(e); - // don't dispatch value(s) - return; - } - if (this.dispatchXformVals(acc)) { - isReduced(acc) && this.done(); - } - } - - protected dispatchXformDone() { - let acc: B[] | Reduced; - try { - // collect remaining values from transducer - acc = this.xform![1]([]); - } catch (e) { - // error in transducer can only be handled by the wrapped - // subscriber's error handler (if avail) - return this.error(e); - } - return this.dispatchXformVals(acc); - } - - protected dispatchXformVals(acc: B[] | Reduced) { - const uacc = unreduced(acc); - for ( - let i = 0, n = uacc.length; - i < n && this.state < State.DONE; - i++ - ) { - this.dispatch(uacc[i]); - } - return this.state < State.ERROR; - } - - protected release() { - this.subs.clear(); - delete this.parent; - delete this.xform; - delete this.last; - } -} diff --git a/packages/rstream/src/subs/asidechain.ts b/packages/rstream/src/subs/asidechain.ts index b17e930108..62058b6ba2 100644 --- a/packages/rstream/src/subs/asidechain.ts +++ b/packages/rstream/src/subs/asidechain.ts @@ -14,7 +14,7 @@ export abstract class ASidechain extends Subscription { unsubscribe(sub?: Subscription) { const res = super.unsubscribe(sub); - if (!sub || !this.subs.length) { + if (!sub || !this.subs.size) { this.sideSub.unsubscribe(); } return res; diff --git a/packages/rstream/src/subs/resolve.ts b/packages/rstream/src/subs/resolve.ts index a88fc1944a..bef46c7370 100644 --- a/packages/rstream/src/subs/resolve.ts +++ b/packages/rstream/src/subs/resolve.ts @@ -70,7 +70,11 @@ export class Resolver extends Subscription, T> { } done() { - if (this.parent!.getState() === State.DONE && this.outstanding === 0) { + if ( + this.parent && + this.parent.getState() === State.DONE && + this.outstanding === 0 + ) { super.done(); } } diff --git a/packages/rstream/src/subs/sidechain-partition.ts b/packages/rstream/src/subs/sidechain-partition.ts index 147d62d0cc..a51d17a3da 100644 --- a/packages/rstream/src/subs/sidechain-partition.ts +++ b/packages/rstream/src/subs/sidechain-partition.ts @@ -55,7 +55,7 @@ export const sidechainPartition = ( */ export const sidechainPartitionRAF = (src: ISubscribable) => src - .subscribe(sidechainPartition(fromRAF())) + .subscribe(sidechainPartition(fromRAF())) .transform(map(peek)); export class SidechainPartition extends ASidechain { diff --git a/packages/rstream/src/subs/timeout.ts b/packages/rstream/src/subs/timeout.ts index 224756622c..8544f2fff6 100644 --- a/packages/rstream/src/subs/timeout.ts +++ b/packages/rstream/src/subs/timeout.ts @@ -64,18 +64,19 @@ class Timeout extends Subscription { reset() { this.timeoutId = setTimeout(() => { if (this.state < State.DONE) { - this.error( + this.dispatchTo( + "error", this.errorObj || new Error( - `Timeout stream "${this.id}" after ${this.timeoutMs} ms` + `Timeout '${this.id}' after ${this.timeoutMs} ms` ) ); } }, this.timeoutMs); } - cleanup(): void { + release() { clearTimeout(this.timeoutId); - super.cleanup(); + super.release(); } } diff --git a/packages/rstream/src/subscription.ts b/packages/rstream/src/subscription.ts index 77864675ca..127f55f389 100644 --- a/packages/rstream/src/subscription.ts +++ b/packages/rstream/src/subscription.ts @@ -1,11 +1,11 @@ -import { Fn, NULL_LOGGER, SEMAPHORE } from "@thi.ng/api"; -import { peek } from "@thi.ng/arrays"; -import { implementsFunction, isPlainObject } from "@thi.ng/checks"; +import { assert, Fn, NULL_LOGGER, SEMAPHORE } from "@thi.ng/api"; +import { isPlainObject } from "@thi.ng/checks"; import { illegalState } from "@thi.ng/errors"; import { comp, isReduced, map, + peek, push, Reduced, Reducer, @@ -24,7 +24,7 @@ import { WithErrorHandlerOpts, WithTransform, } from "./api"; -import { nextID, optsWithID } from "./utils/idgen"; +import { optsWithID } from "./utils/idgen"; /** * Creates a new {@link Subscription} instance, the fundamental datatype @@ -80,43 +80,35 @@ export const subscription = ( opts?: Partial> ) => new Subscription(sub, opts); -/** - * @see {@link subscription} for reference & examples. - */ export class Subscription implements ISubscription { id: string; - closeIn: CloseMode; closeOut: CloseMode; - parent?: ISubscription; + __owner?: ISubscription; - protected subs: Partial>[]; protected xform?: Reducer; - protected state: State = State.IDLE; - protected cacheLast: boolean; - protected last: any; + protected last: any = SEMAPHORE; + protected state = State.IDLE; + protected subs = new Set>>(); constructor( - sub?: Partial>, - opts: Partial> = {} + protected wrapped?: Partial>, + opts?: Partial> ) { + opts = optsWithID(`$sub`, { + closeIn: CloseMode.LAST, + closeOut: CloseMode.LAST, + cache: true, + ...opts, + }); this.parent = opts.parent; - this.closeIn = - opts.closeIn !== undefined ? opts.closeIn : CloseMode.LAST; - this.closeOut = - opts.closeOut !== undefined ? opts.closeOut : CloseMode.LAST; - this.cacheLast = opts.cache !== false; - this.id = opts.id || `sub-${nextID()}`; - this.last = SEMAPHORE; - this.subs = []; - if (sub) { - this.subs.push(sub); - } - if (opts.xform) { - this.xform = opts.xform(push()); - } + this.id = opts.id!; + this.closeIn = opts.closeIn!; + this.closeOut = opts.closeOut!; + this.cacheLast = opts.cache!; + opts.xform && (this.xform = opts.xform(push())); } deref(): B | undefined { @@ -145,29 +137,19 @@ export class Subscription implements ISubscription { opts: Partial> = {} ): any { this.ensureState(); - let $sub: Subscription; - if (implementsFunction(sub, "subscribe") && !opts.xform) { - $sub = >sub; - $sub.parent = this; + let $sub: ISubscriber; + if (sub instanceof Subscription && !opts.xform) { + // ensure sub is still unattached + assert(!sub.parent, `sub '${sub.id}' already has a parent`); + sub.parent = this; + $sub = sub; } else { - $sub = subscription(sub, { parent: this, ...opts }); - } - this.last !== SEMAPHORE && $sub.next(this.last); - return this.addWrapped($sub); - } - - /** - * Returns array of new child subscriptions for all given - * subscribers. - * - * @param subs - - */ - subscribeAll(...subs: ISubscriber[]) { - const wrapped: ISubscription[] = []; - for (let s of subs) { - wrapped.push(this.subscribe(s)); + $sub = new Subscription(sub, { ...opts, parent: this }); } - return wrapped; + this.subs.add($sub); + this.state = State.ACTIVE; + this.last != SEMAPHORE && $sub.next(this.last); + return $sub; } /** @@ -240,33 +222,24 @@ export class Subscription implements ISubscription { return this.transform(map(fn), opts || {}); } - /** - * If called without arg, removes this subscription from parent (if - * any), cleans up internal state and goes into DONE state. If - * called with arg, removes the sub from internal pool and if no - * other subs are remaining also cleans up itself and goes into DONE - * state. - * - * @param sub - - */ - unsubscribe(sub?: ISubscription) { - LOGGER.debug(this.id, "unsub start", sub ? sub.id : "self"); - if (!sub) { - let res = true; - if (this.parent) { - res = this.parent.unsubscribe(this); - } - this.state = State.DONE; - this.cleanup(); - return res; - } + unsubscribe(sub?: Partial>) { + return sub ? this.unsubscribeChild(sub) : this.unsubscribeSelf(); + } + + protected unsubscribeSelf() { + LOGGER.debug(this.id, "unsub self"); + this.parent && this.parent.unsubscribe(this); + this.state < State.DONE && (this.state = State.UNSUBSCRIBED); + this.release(); + return true; + } + + protected unsubscribeChild(sub: Partial>) { LOGGER.debug(this.id, "unsub child", sub.id); - const idx = this.subs.indexOf(sub); - if (idx >= 0) { - this.subs.splice(idx, 1); + if (this.subs.delete(sub)) { if ( this.closeOut === CloseMode.FIRST || - (!this.subs.length && this.closeOut !== CloseMode.NEVER) + (!this.subs.size && this.closeOut !== CloseMode.NEVER) ) { this.unsubscribe(); } @@ -277,96 +250,79 @@ export class Subscription implements ISubscription { next(x: A) { if (this.state >= State.DONE) return; - this.xform ? this.dispatchXform(x) : this.dispatch(x); + this.xform ? this.dispatchXform(x) : this.dispatch(x); } done() { LOGGER.debug(this.id, "entering done()"); - if (this.state < State.DONE) { - try { - if (this.xform) { - const acc = this.xform[1]([]); - const uacc = unreduced(acc); - const n = uacc.length; - for (let i = 0; i < n; i++) { - this.dispatch(uacc[i]); - } - } - } catch (e) { - this.error(e); - return; - } - this.state = State.DONE; - for (let s of this.subs.slice()) { - try { - s.done && s.done(); - } catch (e) { - s.error ? s.error(e) : this.error(e); - } - } + if (this.state >= State.DONE) return; + if (this.xform) { + if (!this.dispatchXformDone()) return; + } + this.state = State.DONE; + // attempt to call .done in wrapped sub + if (this.dispatchTo("done")) { + // disconnect from parent & internal cleanup this.unsubscribe(); - LOGGER.debug(this.id, "exiting done()"); } + LOGGER.debug(this.id, "exiting done()"); } error(e: any) { - this.state = State.ERROR; - const subs = this.subs; - let notified = false; - if (subs.length) { - for (let s of subs.slice()) { - if (s.error) { - s.error(e); - notified = true; - } - } - } - if (!notified) { - // ensure error is at least logged to console - // even if default NULL_LOGGER is used... - (LOGGER !== NULL_LOGGER ? LOGGER : console).warn( - this.id, - "unhandled error:", - e - ); - if (this.parent) { - LOGGER.debug(this.id, "unsubscribing..."); - this.unsubscribe(); - this.state = State.ERROR; - } - } - return notified; + // only the wrapped sub's error handler gets a chance + // to deal with the error + const sub = this.wrapped; + const hasErrorHandler = sub && sub.error; + hasErrorHandler && + LOGGER.debug(this.id, "attempting wrapped error handler"); + // flag success if error handler returns true + // (i.e. it could handle/recover from the error) + // else detach this entire sub by going into error state... + return (hasErrorHandler && sub!.error!(e)) || this.unhandledError(e); } - protected addWrapped(sub: Subscription) { - this.subs.push(sub); - this.state = State.ACTIVE; - return sub; + protected unhandledError(e: any) { + // ensure error is at least logged to console + // even if default NULL_LOGGER is used... + (LOGGER !== NULL_LOGGER ? LOGGER : console).warn( + this.id, + "unhandled error:", + e + ); + this.unsubscribe(); + this.state = State.ERROR; + return false; } - protected dispatch(x: B) { - // LOGGER.debug(this.id, "dispatch", x); - this.cacheLast && (this.last = x); - const subs = this.subs; - let n = subs.length; - let s: Partial>; - if (n === 1) { - s = subs[0]; + protected dispatchTo(type: "next" | "done" | "error", x?: B) { + let s: Partial> | undefined = this.wrapped; + if (s) { try { - s.next && s.next(x); + s[type] && s[type]!(x!); } catch (e) { - s.error ? s.error(e) : this.error(e); + // give wrapped sub a chance to handle error + // (if that failed then we're already in error state now & terminate) + if (!this.error(e)) return false; } - } else { - for (; --n >= 0; ) { - s = subs[n]; - try { - s.next && s.next(x); - } catch (e) { - s.error ? s.error(e) : this.error(e); + } + // process other child subs + for (s of type === "next" ? this.subs : [...this.subs]) { + try { + s[type] && s[type]!(x!); + } catch (e) { + if (type === "error" || !s.error || !s.error(e)) { + // if no or failed handler, go into error state + return this.unhandledError(e); } } } + return true; + } + + protected dispatch(x: B) { + LOGGER.debug(this.id, "dispatch", x); + this.cacheLast && (this.last = x); + this.dispatchTo("next", x); } protected dispatchXform(x: A) { @@ -374,15 +330,40 @@ export class Subscription implements ISubscription { try { acc = this.xform![2]([], x); } catch (e) { + // error in transducer can only be handled by the wrapped + // subscriber's error handler (if avail) this.error(e); + // don't dispatch value(s) return; } + if (this.dispatchXformVals(acc)) { + isReduced(acc) && this.done(); + } + } + + protected dispatchXformDone() { + let acc: B[] | Reduced; + try { + // collect remaining values from transducer + acc = this.xform![1]([]); + } catch (e) { + // error in transducer can only be handled by the wrapped + // subscriber's error handler (if avail) + return this.error(e); + } + return this.dispatchXformVals(acc); + } + + protected dispatchXformVals(acc: B[] | Reduced) { const uacc = unreduced(acc); - const n = uacc.length; - for (let i = 0; i < n; i++) { + for ( + let i = 0, n = uacc.length; + i < n && this.state < State.DONE; + i++ + ) { this.dispatch(uacc[i]); } - isReduced(acc) && this.done(); + return this.state < State.ERROR; } protected ensureState() { @@ -391,9 +372,8 @@ export class Subscription implements ISubscription { } } - protected cleanup() { - LOGGER.debug(this.id, "cleanup"); - this.subs.length = 0; + protected release() { + this.subs.clear(); delete this.parent; delete this.xform; delete this.last;