Skip to content

Commit

Permalink
feat(rstream): Add a timeout() subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
andrew8er committed Jul 17, 2018
1 parent 2733684 commit aa55973
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 0 deletions.
33 changes: 33 additions & 0 deletions packages/rstream/src/subs/timeout.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import { State } from "../api"
import { Subscription } from "../subscription";

/**
* A subscription that emits an error object after a given time.
*
* @param timeoutMs Timeout value in milliseconds.
* @param error An optional error object. Will use a new instance of `Error` by default
* @param id An optional stream id.
*/
export function timeout<T>(timeoutMs: number, error?: any, id?: string): Subscription<T, T> {
return new Timeout(timeoutMs, error, id);
}

class Timeout<T> extends Subscription<T, T> {
private readonly timeoutId: any;

constructor(timeoutMs: number, error?: any, id?: string) {
super(undefined, undefined, undefined, id || `timeout-${Subscription.NEXT_ID++}`);

this.timeoutId = setTimeout(() => {
if (this.state < State.DONE) {
this.error(error || new Error(`Timeout stream "${this.id}" after ${timeoutMs} ms`))
}
}, timeoutMs);
}

cleanup(): void {
clearTimeout(this.timeoutId);
super.cleanup();
}
}

34 changes: 34 additions & 0 deletions packages/rstream/test/timeout.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import * as assert from "assert";
import { timeout } from "../src/subs/timeout";

describe("Timeout", () => {
it("times out", function(done) {
this.timeout(20);

timeout(10).subscribe({
error: () => done()
})
});

it("times out with error object", function (done) {
this.timeout(20);

const error = 'error object';

timeout(10, error).subscribe({
error: (err) => { assert.equal(err, error); done() }
})
});

it("cancels timeout in cleanup()", function (done) {
this.timeout(40);

timeout(10)
.subscribe({
error: () => assert.fail('timed out'),
})
.unsubscribe();

setTimeout(() => done(), 20)
});
});

0 comments on commit aa55973

Please sign in to comment.