-
-
Notifications
You must be signed in to change notification settings - Fork 151
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(rstream): Add a timeout() subscription #30
feat(rstream): Add a timeout() subscription #30
Conversation
Hi @andrew8er - thanks & that's a useful addition! Though I'd think we don't actually need a new type/class for this since this can easily be achieved via existing constructs: const timeout = (delay: number, error?: any, id?: string) =>
new Stream(
(s) => {
const timerID = setTimeout(() => {
s.error(error || new Error(`Timeout stream "${s.id}" after ${delay} ms`));
}, delay);
return () => clearTimeout(timerID)
},
id || `timout-${Stream.NEXT_ID++}`
); Haven't checked this for correctness, but something along these lines should work the same... |
I thought it would be a nice addition. I use it often as an intermediate subscription to add a cap on the total time of an operation. It is not meant to be used by itself, more like |
I agree and am all up for merging it, but could you please verify if that above |
I can verify it in a running app on Monday, but the implementation looks good to me. |
Actually, sorry, I just realised what you meant with your comment. Maybe it'd be good to have both versions, since i can think of a few use cases where this can also be handy as a stream source. |
Aren't both versions functionaly equivalent? |
Been thinking about this and yes, they actually are (need more coffee, it seems 😄 ). Just played with this some more and thought it might be more flexible if the timeout is reset with each received value before the timeout period, like this: export class Timeout<T> extends Subscription<T, T> {
protected timeoutMs: any;
protected timeoutId: any;
protected errorObj: any;
constructor(timeoutMs: number, error?: any, id?: string) {
super(undefined, undefined, undefined, id || `timeout-${Subscription.NEXT_ID++}`);
this.timeoutMs = timeoutMs;
this.errorObj = error;
this.reset();
}
next(x: T) {
clearTimeout(this.timeoutId);
this.reset();
super.next(x);
}
reset() {
this.timeoutId = setTimeout(() => {
if (this.state < State.DONE) {
this.error(
this.errorObj ||
new Error(`Timeout stream "${this.id}" after ${this.timeoutMs} ms`)
);
}
}, this.timeoutMs);
}
cleanup(): void {
clearTimeout(this.timeoutId);
super.cleanup();
}
} What do you think? |
I thought of this too, but wanted to make it either optional or as a separate implementation. My use case requieres a total cap on the stream time, but others might benefit from this. |
To avoid code duplication I'd opt for a next(x: T) {
if (this.resetTimeout) {
clearTimeout(this.timeoutId);
this.reset();
}
super.next(x);
} |
Happy to use |
Looks good to me! |
Do you mind if I add this new updated version then instead of your original PR? Got time this afternoon... |
Sure thing!
|
This PR adds a
timeout()
subscription. It sends an error down the stream after a configurable amount of time.