Skip to content

Commit

Permalink
fix(rstream): MetaStream close mode handling
Browse files Browse the repository at this point in the history
- never go into DONE state if `closeIn == CloseMode.NEVER`
- fix/update unsubscribe() & pass arg
- update detach() to consider `closeOut` mode
- add tests
  • Loading branch information
postspectacular committed May 3, 2020
1 parent d177553 commit 2d9e907
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 11 deletions.
24 changes: 13 additions & 11 deletions packages/rstream/src/metastream.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { assert, Fn } from "@thi.ng/api";
import { CommonOpts, State } from "./api";
import { CloseMode, CommonOpts, State } from "./api";
import { Subscription } from "./subscription";
import { optsWithID } from "./utils/idgen";

Expand Down Expand Up @@ -123,30 +123,32 @@ export class MetaStream<A, B> extends Subscription<A, B> {
}
},
error: (e) => super.error(e),
__owner: this
__owner: this,
});
}
}
}

done() {
if (this.stream) {
this.detach();
this.detach(true);
}
super.done();
this.closeIn !== CloseMode.NEVER && super.done();
}

unsubscribe(sub?: Subscription<B, any>) {
if (this.stream && (!sub || this.subs.length === 1)) {
this.detach();
this.detach(!sub);
}
return super.unsubscribe();
return super.unsubscribe(sub);
}

protected detach() {
assert(!!this.stream, "input stream already removed");
this.stream!.unsubscribe(this.sub);
delete this.stream;
delete this.sub;
protected detach(force: boolean) {
if (force || this.closeOut !== CloseMode.NEVER) {
assert(!!this.stream, "input stream already removed");
this.stream!.unsubscribe(this.sub);
delete this.stream;
delete this.sub;
}
}
}
74 changes: 74 additions & 0 deletions packages/rstream/test/metastream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import * as assert from "assert";
import { CloseMode, fromIterable, metaStream, State } from "../src/index";
import { TIMEOUT } from "./config";

describe("MetaStream", () => {
it("basic", (done) => {
const src = fromIterable([1, 2, 3], { delay: TIMEOUT });
const meta = metaStream<number, number>((x) =>
fromIterable([x * 10, x * 20, x * 30], { delay: TIMEOUT >> 2 })
);
const sub = src.subscribe(meta);
const acc: number[] = [];
const sub2 = sub.subscribe({
next(x) {
acc.push(x);
},
});
setTimeout(() => {
assert.deepEqual(acc, [10, 20, 30, 20, 40, 60, 30, 60, 90]);
assert.equal(meta.getState(), State.DONE);
assert.equal(sub.getState(), State.DONE);
assert.equal(sub2.getState(), State.DONE);
done();
}, 5 * TIMEOUT);
});

it("closein", (done) => {
const src = fromIterable([1], { delay: TIMEOUT });
const meta = metaStream((x) => fromIterable([x]), {
closeIn: CloseMode.NEVER,
});
const sub = src.subscribe(meta);
const child = sub.subscribe({
next(x) {
console.log(x);
},
});
setTimeout(() => {
assert.equal(src.getState(), State.DONE);
assert.equal(meta.getState(), State.ACTIVE);
assert.equal(sub.getState(), State.ACTIVE);
assert.equal(child.getState(), State.IDLE);
done();
}, 3 * TIMEOUT);
});

it("closeout", (done) => {
const src = fromIterable([1], { delay: TIMEOUT });
const meta = src.subscribe(
metaStream((x) => fromIterable([x * 10]), {
closeIn: CloseMode.NEVER,
closeOut: CloseMode.NEVER,
})
);
const acc: number[] = [];
const child = meta.subscribe({
next(x) {
acc.push(x);
},
});
setTimeout(() => {
child.unsubscribe();
assert.equal(src.getState(), State.DONE);
assert.equal(meta.getState(), State.ACTIVE);
meta.subscribe({
next(x) {
acc.push(x);
},
});
assert.deepEqual(acc, [10, 10]);
done();
}, 3 * TIMEOUT);
});
});

0 comments on commit 2d9e907

Please sign in to comment.