Skip to content

Commit

Permalink
refactor(rstream): replace DEBUG w/ LOGGER impl, add setLogger()
Browse files Browse the repository at this point in the history
  • Loading branch information
postspectacular committed Apr 24, 2019
1 parent abec897 commit 8587989
Show file tree
Hide file tree
Showing 9 changed files with 29 additions and 28 deletions.
8 changes: 6 additions & 2 deletions packages/rstream/src/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ import {
Fn,
Fn0,
IDeref,
IID
IID,
ILogger,
NULL_LOGGER
} from "@thi.ng/api";
import { Transducer } from "@thi.ng/transducers";
import { Stream } from "./stream";
Expand Down Expand Up @@ -66,4 +68,6 @@ export interface IStream<T> extends ISubscriber<T> {
export type StreamCancel = () => void;
export type StreamSource<T> = (sub: Stream<T>) => StreamCancel | void;

export let DEBUG = false;
export let LOGGER = NULL_LOGGER;

export const setLogger = (logger: ILogger) => (LOGGER = logger);
4 changes: 2 additions & 2 deletions packages/rstream/src/from/worker.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { DEBUG } from "../api";
import { LOGGER } from "../api";
import { Stream } from "../stream";
import { nextID } from "../utils/idgen";
import { makeWorker } from "../utils/worker";
Expand Down Expand Up @@ -42,7 +42,7 @@ export const fromWorker = <T>(
_worker.removeEventListener("message", ml);
_worker.removeEventListener("error", el);
if (terminate) {
DEBUG && console.log("terminating worker", _worker);
LOGGER.info("terminating worker", _worker);
_worker.terminate();
}
};
Expand Down
4 changes: 2 additions & 2 deletions packages/rstream/src/pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Fn, Predicate2 } from "@thi.ng/api";
import { EquivMap } from "@thi.ng/associative";
import { unsupported } from "@thi.ng/errors";
import { Transducer } from "@thi.ng/transducers";
import { DEBUG, ISubscriber } from "./api";
import { ISubscriber, LOGGER } from "./api";
import { Subscription, subscription } from "./subscription";
import { nextID } from "./utils/idgen";

Expand Down Expand Up @@ -132,7 +132,7 @@ export class PubSub<A, B> extends Subscription<A, B> {
}

protected dispatch(x: B) {
DEBUG && console.log(this.id, "dispatch", x);
LOGGER.debug(this.id, "dispatch", x);
const t = this.topicfn(x);
if (t !== undefined) {
const sub = this.topics.get(t);
Expand Down
4 changes: 2 additions & 2 deletions packages/rstream/src/stream-sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import {
} from "@thi.ng/transducers";
import {
CloseMode,
DEBUG,
ISubscribable,
LOGGER,
State
} from "./api";
import { Subscription } from "./subscription";
Expand Down Expand Up @@ -212,7 +212,7 @@ export class StreamSync<A, B> extends Subscription<A, B> {
const sub = this.sources.get(src);
if (sub) {
const id = this.invRealSourceIDs.get(src.id);
DEBUG && console.log(`removing src: ${src.id} (${id})`);
LOGGER.info(`removing src: ${src.id} (${id})`);
this.sourceIDs.delete(id);
this.realSourceIDs.delete(id);
this.idSources.delete(src.id);
Expand Down
4 changes: 2 additions & 2 deletions packages/rstream/src/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ import { isString } from "@thi.ng/checks";
import { illegalArity } from "@thi.ng/errors";
import { Transducer } from "@thi.ng/transducers";
import {
DEBUG,
IStream,
ISubscriber,
LOGGER,
StreamCancel,
StreamSource
} from "./api";
Expand Down Expand Up @@ -138,7 +138,7 @@ export class Stream<T> extends Subscription<T, T> implements IStream<T> {

cancel() {
if (this._cancel) {
DEBUG && console.log(this.id, "cancel");
LOGGER.debug(this.id, "cancel");
const f = this._cancel;
delete this._cancel;
f();
Expand Down
4 changes: 2 additions & 2 deletions packages/rstream/src/subs/post-worker.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { isTransferable } from "@thi.ng/checks";
import { isTypedArray } from "@thi.ng/checks";
import { DEBUG, ISubscriber } from "../api";
import { ISubscriber, LOGGER } from "../api";
import { makeWorker } from "../utils/worker";

/**
Expand Down Expand Up @@ -57,7 +57,7 @@ export const postWorker = <T>(
done() {
if (terminate > 0) {
setTimeout(() => {
DEBUG && console.log("terminating worker...");
LOGGER.info("terminating worker...");
_worker.terminate();
}, terminate);
}
Expand Down
7 changes: 2 additions & 5 deletions packages/rstream/src/subs/resolve.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Fn, IID } from "@thi.ng/api";
import { DEBUG, State } from "../api";
import { LOGGER, State } from "../api";
import { Subscription } from "../subscription";
import { nextID } from "../utils/idgen";

Expand Down Expand Up @@ -52,10 +52,7 @@ export class Resolver<T> extends Subscription<Promise<T>, T> {
this.done();
}
} else {
DEBUG &&
console.log(
`resolved value in state ${this.state} (${x})`
);
LOGGER.warn(`resolved value in state ${this.state} (${x})`);
}
},
(e) => {
Expand Down
4 changes: 2 additions & 2 deletions packages/rstream/src/subs/tunnel.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Fn } from "@thi.ng/api";
import { DEBUG, State } from "../api";
import { LOGGER, State } from "../api";
import { Subscription } from "../subscription";
import { nextID } from "../utils/idgen";
import { makeWorker } from "../utils/worker";
Expand Down Expand Up @@ -106,7 +106,7 @@ export class Tunnel<A, B> extends Subscription<A, B> {
super.done();
if (this.terminate > 0) {
setTimeout(() => {
DEBUG && console.log("terminating workers...");
LOGGER.info("terminating workers...");
this.workers.forEach((worker) => worker && worker.terminate());
delete this.workers;
}, this.terminate);
Expand Down
18 changes: 9 additions & 9 deletions packages/rstream/src/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import {
unreduced
} from "@thi.ng/transducers";
import {
DEBUG,
ISubscribable,
ISubscriber,
LOGGER,
State
} from "./api";
import { nextID } from "./utils/idgen";
Expand Down Expand Up @@ -203,7 +203,7 @@ export class Subscription<A, B>
* @param sub
*/
unsubscribe(sub?: Subscription<B, any>) {
DEBUG && console.log(this.id, "unsub start", sub ? sub.id : "self");
LOGGER.debug(this.id, "unsub start", sub ? sub.id : "self");
if (!sub) {
let res = true;
if (this.parent) {
Expand All @@ -214,7 +214,7 @@ export class Subscription<A, B>
return res;
}
if (this.subs) {
DEBUG && console.log(this.id, "unsub child", sub.id);
LOGGER.debug(this.id, "unsub child", sub.id);
const idx = this.subs.indexOf(sub);
if (idx >= 0) {
this.subs.splice(idx, 1);
Expand Down Expand Up @@ -246,7 +246,7 @@ export class Subscription<A, B>
}

done() {
DEBUG && console.log(this.id, "done start");
LOGGER.debug(this.id, "done start");
if (this.state < State.DONE) {
if (this.xform) {
const acc = this.xform[1]([]);
Expand All @@ -261,7 +261,7 @@ export class Subscription<A, B>
s.done && s.done();
}
this.unsubscribe();
DEBUG && console.log(this.id, "done");
LOGGER.debug(this.id, "done");
}
}

Expand All @@ -277,9 +277,9 @@ export class Subscription<A, B>
}
}
if (!notified) {
console.log(this.id, "unhandled error:", e);
LOGGER.warn(this.id, "unhandled error:", e);
if (this.parent) {
DEBUG && console.log(this.id, "unsubscribing...");
LOGGER.debug(this.id, "unsubscribing...");
this.unsubscribe();
this.state = State.ERROR;
}
Expand All @@ -293,7 +293,7 @@ export class Subscription<A, B>
}

protected dispatch(x: B) {
DEBUG && console.log(this.id, "dispatch", x);
// LOGGER.debug(this.id, "dispatch", x);
this.last = x;
const subs = this.subs;
let s: ISubscriber<B>;
Expand Down Expand Up @@ -323,7 +323,7 @@ export class Subscription<A, B>
}

protected cleanup() {
DEBUG && console.log(this.id, "cleanup");
LOGGER.debug(this.id, "cleanup");
this.subs.length = 0;
delete this.parent;
delete this.xform;
Expand Down

0 comments on commit 8587989

Please sign in to comment.