Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add timeout operator to Flow #2624

Closed
pablobaxter opened this issue Apr 4, 2021 · 13 comments
Closed

Add timeout operator to Flow #2624

pablobaxter opened this issue Apr 4, 2021 · 13 comments
Labels

Comments

@pablobaxter
Copy link
Contributor

Enhancement: Add timeout operator to Flow

This is in response to the updated contribution guidelines, and to push for a timeout mechanism for Flow (#2597). Currently, there is no easy way to timeout a flow if the upstream takes too long to produce an event.

Use case:
A flow when collected may not always emit items at a regular interval. If this flow is using some resource intensive process or enabling some hardware, leaving it active with no events for an extended period of time is wasteful. By having a timeout operator on the flow if no events have emitted, it allows for creating flows based with a defined timeout strategy, to ensure these resources aren't being accidentally misused.

@qwwdfsad qwwdfsad added the flow label Apr 6, 2021
@joffrey-bion
Copy link
Contributor

joffrey-bion commented Apr 6, 2021

I'm not against such operator at all (on the contrary) but I am not sure this specific use case is a good example for it.

This use case seems to need the timeout on the consumer side, and for this I wonder why a withTimeout around the collect call is not sufficient. Could you please elaborate?

I would imagine that .timeout() operator for a flow would be useful for the producer side instead, so that a returned flow can be forced to have a timeout. For instance, if the producer knows that the events are too irregular and prefers to send "stub" events or something to indicate it's taking too long, it would return a Flow with the timeout operator already set up.

@pablobaxter
Copy link
Contributor Author

To give a more specific use-case, I'll go with one for Android:

With GPS location on Android, it's not always guaranteed that location events come in at the desired interval (it could vary based on the environment). It's even possible that no GPS location will come in if there is no clear signal. However, even if no signal is available, the GPS radio will remain on, draining battery power. To prevent this, you would use a timeout on the location events coming in, and if there are none after a period of time, turn off the radio.

An example of what a location emitting flow would look like (without a timeout):

fun getLocationFlow(context: Context): Flow<Location> {
    val locationManager = context.getSystemService(Context.LOCATION_SERVICE) as LocationManager
    return callbackFlow {
        val listener = LocationListener { sendBlocking(it) }
        locationManager.requestLocationUpdates(LocationManager.GPS_PROVIDER, 0L, 0F, listener)
        invokeOnClose { locationManager.removeUpdates(listener) }
    }.buffer(Channel.UNLIMITED)
}

Every consumer of this flow could have a different requirement for what they consider to be a timeout event. One example is that these locations may not be accurate enough. In this case, a filter would be added to the flow, which filters out inaccurate locations.

getLocationFlow(context)
    .filter(::locationFilteringLogic)

If locations are emitted regularly until we get to a bad signal area, where new locations have now become inaccurate enough to be filtered, I would want a timeout to occur if I received no location for the last 10 seconds. Adding the timeout to the producer wouldn't work, as it could be emitting location data regularly, even though they are filtered downstream. Using a withTimeout around collect would timeout the entire flow, even if I am still receiving valid locations. In this case, I would use a timeout just after the filter, which I would want to either throw an exception if triggered, or perform some other action of my choosing.

@joffrey-bion
Copy link
Contributor

joffrey-bion commented Apr 7, 2021

However, even if no signal is available, the GPS radio will remain on, draining battery power. To prevent this, you would use a timeout on the location events coming in, and if there are none after a period of time, turn off the radio.

The requirement seems reasonable, but the solution doesn't seem to help with this problem, as the upstream would probably not be affected by the timeout at all (at least in the PR, I didn't see any mechanism to re-collect the upstream flow on timeout).
So invokeOnClose won't be called on timeout AFAICT.

Using a withTimeout around collect would timeout the entire flow, even if I am still receiving valid locations

Exactly, and that is the point: cancel the flow so that the producer can clean up resources. If you want to re-subscribe later, you can always re-collect that flow.

In any case, in this example I don't believe there is a way for you to know that new valid locations are available again after a timeout without keeping the location service on in the first place. Or am I missing something?

@pablobaxter
Copy link
Contributor Author

The requirement seems reasonable, but the solution doesn't seem to help with this problem, as the upstream would probably not be affected by the timeout at all (at least in the PR, I didn't see any mechanism to re-collect the upstream flow on timeout).
So invokeOnClose won't be called on timeout AFAICT.

In the PR, an exception is being thrown, which would close the the entire flow, unless the flow was a SharedFlow, in which case only the downstream would be closed. Even if you override the default (which throws a public facing FlowTimeoutException), internally the flow has already ended due to the InternalFlowTimeoutException being thrown.

Exactly, and that is the point: cancel the flow so that the producer can clean up resources. If you want to re-subscribe later, you can always re-collect that flow.

The withTimeout block would cancel the flow if collect suspended for longer than the given time. So if I wrapped the above flow in a withTimeout block with 5 seconds, I would collect 5 seconds of location before the entire flow is cancelled, regardless if the flow was still emitting or not. That doesn't solve the issue of me wanting to collect for as long as the GPS is emitting. If the GPS radio stops emitting locations for 5+ seconds, that's where I want to just turn it off, and handle that event separately.

In any case, in this example I don't believe there is a way for you to know that new valid locations are available again after a timeout without keeping the location service on in the first place. Or am I missing something?

Well... I could leave the radio on, but users would be upset for the battery drain. :-) In the above example, if I got the timeout trigger, I would probably setup a different trigger (timer, geofence, activity recognition) to notify for a retry.

@pablobaxter
Copy link
Contributor Author

Actually, this conversation is showing me a bug in my implementation. This breaks the retry operator... I'm not sure if it's because I'm using a callbackFlow or something else.

@pablobaxter
Copy link
Contributor Author

What I thought was a bug: If the timeout emits an item instead of throwing an exception, the retry operators don't work (similar thing happens if you do catch before the retry operator).

It wasn't a bug, but I did see I was misusing the scopedFlow a bit, so I made some corrections on my PR there.

@joffrey-bion
Copy link
Contributor

In the PR, an exception is being thrown, which would close the the entire flow

Ok, I think I had missed the initial intent of the timeout operator then. For some reason I expected a continuity, basically the timeout action in my mind was just an extra hook when too much time passes since the last received value, but without implication of cancellation of the upstream, so that further values could still be received from the same upstream.
But I guess that could be achieved with a combination of timeout + retry.

if I wrapped the above flow in a withTimeout block with 5 seconds, I would collect 5 seconds of location before the entire flow is cancelled, regardless if the flow was still emitting or not.

Oh, right. Obvious overlook on my part, sorry about that.

@pablobaxter
Copy link
Contributor Author

In the PR, an exception is being thrown, which would close the the entire flow

Just as an FYI, this isn't being done any longer. The flow is still cancelled, so it's done both upstream and downstream (thus awaitClose will be called), but it's following a similar pattern to the other Delay operators. The timeout + retry still works though.

Oh, right. Obvious overlook on my part, sorry about that.

No need for apologies. We're human, not machines... or are you?
image

@qwwdfsad
Copy link
Contributor

qwwdfsad commented Jul 30, 2021

Sorry for being late for the party and thanks for the productive discussion!

The overall idea seems worth integrating (especially now when such resources can be shared via shareIn and have to be disconnected somehow) and the idea with a lambda that has FlowCollector receiver is well-aligned with our API shape and goals.

The only thing that concerns me is the naming -- it's easy to confuse with withTimeout and bufferTimeout (the latter is from project Reactor). I do not have better alternatives in mind right now, but this is something I'm going to research and discuss in our design meetings prior to 1.6.0. Stay tuned!

@pablobaxter
Copy link
Contributor Author

Sorry for being late for the party and thanks for the productive discussion!

The overall idea seems worth integrating (especially now when such resources can be shared via shareIn and have to be disconnected somehow) and the idea with a lambda that has FlowCollector receiver is well-aligned with our API shape and goals.

The only thing that concerns me is the naming -- it's easy to confuse with withTimeout and bufferTimeout (the latter is from project Reactor). I do not have better alternatives in mind right now, but this is something I'm going to research and discuss in our design meetings prior to 1.6.0. Stay tuned!

I had a similar concern about the naming when I created the PR for this, but I figured that given this would be an operator on Flow, the context of the function name would make sense. For example, withTimeout can't be used outside of a suspend function, and bufferTimeout can't be used as an operator outside of a Flux.

If you need an argument in your design meetings in favor of timeout(), feel free to use the above. Just providing context on why I went with using that name.

@pablobaxter
Copy link
Contributor Author

@qwwdfsad any update on this?

@pablobaxter
Copy link
Contributor Author

Not trying to bug too much on this issue, just a small nudge to get this back in mind for folks, @qwwdfsad.

@qwwdfsad
Copy link
Contributor

qwwdfsad commented Sep 8, 2022

Sorry folks, we are stretched thin right now and cannot really get back to this issue, yet it's something in our "to do in the observable future" radar

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants