Skip to content

Commit

Permalink
feat(rstream): #281 update Subscription error/teardown logic
Browse files Browse the repository at this point in the history
- replace old `Subscription` class w/ what was recently `Sub2` (removed)
- update/fix done(), subscribe()/unsubscribe() logic
- update related constructs (Stream, StreamSync, MetaStream, etc.)
- update Stream ctor (and factory fns) to support error handler opts arg
- update Timeout error dispatch
- fix typehints
  • Loading branch information
postspectacular committed Mar 10, 2021
1 parent db0ab34 commit a9e4040
Show file tree
Hide file tree
Showing 14 changed files with 183 additions and 498 deletions.
4 changes: 2 additions & 2 deletions packages/rstream/src/forkjoin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ export const forkJoin = <IN, MSG, RES, OUT>(
String(id),
opts.src
.transform(map((x: IN) => opts.fork(id, numWorkers, x)))
.subscribe(
tunnel<MSG, RES>({
.subscribe<RES>(
tunnel({
src: opts.worker,
transferables: opts.transferables,
interrupt: opts.interrupt === true,
Expand Down
7 changes: 5 additions & 2 deletions packages/rstream/src/from/promise.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { CloseMode, CommonOpts, State } from "../api";
import { CloseMode, State, WithErrorHandlerOpts } from "../api";
import { Stream } from "../stream";
import { optsWithID } from "../utils/idgen";

Expand All @@ -13,7 +13,10 @@ import { optsWithID } from "../utils/idgen";
* @param src -
* @param opts -
*/
export const fromPromise = <T>(src: Promise<T>, opts?: Partial<CommonOpts>) => {
export const fromPromise = <T>(
src: Promise<T>,
opts?: Partial<WithErrorHandlerOpts>
) => {
let canceled = false;
let isError = false;
let err: any = {};
Expand Down
5 changes: 3 additions & 2 deletions packages/rstream/src/from/promises.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { mapcat } from "@thi.ng/transducers";
import type { CommonOpts, ISubscription } from "../api";
import type { ISubscription, WithErrorHandlerOpts } from "../api";
import { optsWithID } from "../utils/idgen";
import { fromPromise } from "./promise";

Expand Down Expand Up @@ -38,10 +38,11 @@ import { fromPromise } from "./promise";
* ```
*
* @param promises -
* @param opts -
*/
export const fromPromises = <T>(
promises: Iterable<T | PromiseLike<T>>,
opts?: Partial<CommonOpts>
opts?: Partial<WithErrorHandlerOpts>
): ISubscription<T[], T> =>
fromPromise(Promise.all(promises), optsWithID("promises", opts)).transform(
mapcat((x: T[]) => x)
Expand Down
16 changes: 8 additions & 8 deletions packages/rstream/src/from/worker.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { CommonOpts, LOGGER } from "../api";
import { LOGGER, WithErrorHandlerOpts } from "../api";
import { Stream } from "../stream";
import { optsWithID } from "../utils/idgen";
import { makeWorker } from "../utils/worker";

export interface FromWorkerOpts extends CommonOpts {
export interface FromWorkerOpts extends WithErrorHandlerOpts {
/**
* If true, the worker will be terminated when the stream
* is being closed.
Expand Down Expand Up @@ -43,17 +43,17 @@ export const fromWorker = <T>(
const _worker = makeWorker(worker);
opts = optsWithID("worker", opts);
return new Stream<T>((stream) => {
const ml = (e: MessageEvent) => {
const msgListener = (e: MessageEvent) => {
stream.next(e.data);
};
const el = (e: MessageEvent) => {
const errListener = (e: MessageEvent) => {
stream.error(e.data);
};
_worker.addEventListener("message", ml);
_worker.addEventListener("error", <EventListener>el);
_worker.addEventListener("message", msgListener);
_worker.addEventListener("error", <EventListener>errListener);
return () => {
_worker.removeEventListener("message", ml);
_worker.removeEventListener("error", <EventListener>el);
_worker.removeEventListener("message", msgListener);
_worker.removeEventListener("error", <EventListener>errListener);
if (opts!.terminate !== false) {
LOGGER.info("terminating worker", _worker);
_worker.terminate();
Expand Down
1 change: 0 additions & 1 deletion packages/rstream/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ export * from "./stream";
export * from "./stream-merge";
export * from "./stream-sync";
export * from "./subscription";
export * from "./sub2";
export * from "./trigger";
export * from "./tween";

Expand Down
2 changes: 1 addition & 1 deletion packages/rstream/src/metastream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ export class MetaStream<A, B> extends Subscription<A, B> {
}

unsubscribe(sub?: Subscription<B, any>) {
if (this.stream && (!sub || this.subs.length === 1)) {
if (this.stream && (!sub || this.subs.size === 1)) {
this.detach(!sub);
}
return super.unsubscribe(sub);
Expand Down
10 changes: 2 additions & 8 deletions packages/rstream/src/stream-sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,7 @@ import {
partitionSync,
PartitionSync,
} from "@thi.ng/transducers";
import {
ISubscribable,
ISubscription,
LOGGER,
State,
TransformableOpts,
} from "./api";
import { ISubscribable, ISubscription, LOGGER, TransformableOpts } from "./api";
import { Subscription } from "./subscription";
import { isFirstOrLastInput } from "./utils/checks";
import { optsWithID } from "./utils/idgen";
Expand Down Expand Up @@ -271,10 +265,10 @@ export class StreamSync<

unsubscribe(sub?: Subscription<B, any>) {
if (!sub) {
LOGGER.debug(this.id, "unsub sources");
for (let s of this.sources.values()) {
s.unsubscribe();
}
this.state = State.DONE;
this.sources.clear();
this.psync.clear();
this.realSourceIDs.clear();
Expand Down
31 changes: 19 additions & 12 deletions packages/rstream/src/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import {
CommonOpts,
IStream,
ISubscriber,
ISubscription,
LOGGER,
StreamCancel,
StreamSource,
TransformableOpts,
WithErrorHandlerOpts,
} from "./api";
import { Subscription } from "./subscription";
import { optsWithID } from "./utils/idgen";
Expand Down Expand Up @@ -97,27 +99,32 @@ export class Stream<T> extends Subscription<T, T> implements IStream<T> {
protected _cancel: StreamCancel | undefined;
protected _inited: boolean;

constructor(opts?: Partial<CommonOpts>);
constructor(src: StreamSource<T>, opts?: Partial<CommonOpts>);
constructor(opts?: Partial<WithErrorHandlerOpts>);
constructor(src: StreamSource<T>, opts?: Partial<WithErrorHandlerOpts>);
constructor(
src?: StreamSource<T> | Partial<CommonOpts>,
opts?: Partial<CommonOpts>
src?: StreamSource<T> | Partial<WithErrorHandlerOpts>,
opts?: Partial<WithErrorHandlerOpts>
) {
const [_src, _opts] = isFunction(src) ? [src, opts] : [undefined, src];
super(undefined, optsWithID("stream", _opts));
const [_src, _opts] = isFunction(src)
? [src, opts || {}]
: [undefined, src || {}];
super(
_opts.error ? { error: _opts.error } : undefined,
optsWithID("stream", _opts)
);
this.src = _src;
this._inited = false;
}

subscribe<C>(sub: Subscription<T, C>): Subscription<T, C>;
subscribe<C>(sub: ISubscription<T, C>): ISubscription<T, C>;
subscribe(
sub: Partial<ISubscriber<T>>,
opts?: Partial<CommonOpts>
): Subscription<T, T>;
): ISubscription<T, T>;
subscribe<C>(
sub: Partial<ISubscriber<C>>,
opts?: Partial<TransformableOpts<T, C>>
): Subscription<T, C>;
): ISubscription<T, C>;
subscribe(
sub: Partial<ISubscriber<any>>,
opts: Partial<TransformableOpts<any, any>> = {}
Expand All @@ -130,12 +137,12 @@ export class Stream<T> extends Subscription<T, T> implements IStream<T> {
return $sub;
}

unsubscribe(sub?: Subscription<T, any>) {
unsubscribe(sub?: ISubscription<T, any>) {
const res = super.unsubscribe(sub);
if (
res &&
(!sub ||
((!this.subs || !this.subs.length) &&
((!this.subs || !this.subs.size) &&
this.closeOut !== CloseMode.NEVER))
) {
this.cancel();
Expand All @@ -151,7 +158,7 @@ export class Stream<T> extends Subscription<T, T> implements IStream<T> {
}

error(e: any) {
super.error(e);
if (super.error(e)) return true;
this.cancel();
return false;
}
Expand Down
Loading

0 comments on commit a9e4040

Please sign in to comment.