Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rstream): Add a timeout() subscription #30

Merged

Conversation

andrew8er
Copy link
Contributor

This PR adds a timeout() subscription. It sends an error down the stream after a configurable amount of time.

@postspectacular
Copy link
Member

Hi @andrew8er - thanks & that's a useful addition! Though I'd think we don't actually need a new type/class for this since this can easily be achieved via existing constructs:

const timeout = (delay: number, error?: any, id?: string) =>
    new Stream(
        (s) => {
            const timerID = setTimeout(() => {
                s.error(error || new Error(`Timeout stream "${s.id}" after ${delay} ms`));
            }, delay);
            return () => clearTimeout(timerID)
        },
        id || `timout-${Stream.NEXT_ID++}`
    );

Haven't checked this for correctness, but something along these lines should work the same...

@andrew8er
Copy link
Contributor Author

andrew8er commented Jul 20, 2018

I thought it would be a nice addition. I use it often as an intermediate subscription to add a cap on the total time of an operation. It is not meant to be used by itself, more like resolve().

@postspectacular
Copy link
Member

I agree and am all up for merging it, but could you please verify if that above Stream based alternative is sufficient? I'd just rather try to keep the number of classes down. Thanks! :)

@andrew8er
Copy link
Contributor Author

I can verify it in a running app on Monday, but the implementation looks good to me.

@postspectacular
Copy link
Member

Actually, sorry, I just realised what you meant with your comment. Maybe it'd be good to have both versions, since i can think of a few use cases where this can also be handy as a stream source.

@andrew8er
Copy link
Contributor Author

Aren't both versions functionaly equivalent?

@postspectacular
Copy link
Member

postspectacular commented Jul 20, 2018

Been thinking about this and yes, they actually are (need more coffee, it seems 😄 ). Just played with this some more and thought it might be more flexible if the timeout is reset with each received value before the timeout period, like this:

export class Timeout<T> extends Subscription<T, T> {
    protected timeoutMs: any;
    protected timeoutId: any;
    protected errorObj: any;

    constructor(timeoutMs: number, error?: any, id?: string) {
        super(undefined, undefined, undefined, id || `timeout-${Subscription.NEXT_ID++}`);
        this.timeoutMs = timeoutMs;
        this.errorObj = error;
        this.reset();
    }

    next(x: T) {
        clearTimeout(this.timeoutId);
        this.reset();
        super.next(x);
    }

    reset() {
        this.timeoutId = setTimeout(() => {
            if (this.state < State.DONE) {
                this.error(
                    this.errorObj ||
                    new Error(`Timeout stream "${this.id}" after ${this.timeoutMs} ms`)
                );
            }
        }, this.timeoutMs);
    }

    cleanup(): void {
        clearTimeout(this.timeoutId);
        super.cleanup();
    }
}

What do you think?

@andrew8er
Copy link
Contributor Author

I thought of this too, but wanted to make it either optional or as a separate implementation. My use case requieres a total cap on the stream time, but others might benefit from this.

@postspectacular
Copy link
Member

To avoid code duplication I'd opt for a resetTimeout option to support both cases:

next(x: T) {
    if (this.resetTimeout) {
        clearTimeout(this.timeoutId);
        this.reset();
    }
    super.next(x);
}

@postspectacular
Copy link
Member

Happy to use resetTimeout=false by default, since that's probably the more common scenario...

@andrew8er
Copy link
Contributor Author

Looks good to me!

@postspectacular
Copy link
Member

Do you mind if I add this new updated version then instead of your original PR? Got time this afternoon...

@andrew8er
Copy link
Contributor Author

andrew8er commented Jul 20, 2018 via email

@postspectacular postspectacular merged commit 7b10e0c into thi-ng:master Jul 20, 2018
@andrew8er andrew8er deleted the feature/rstream-add-timeout branch August 27, 2018 14:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants