-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support natural batching in flow #902
Comments
The above statement is reasonable but have to be proven for at least one channel type (ie for array channel). In my opinion a |
@mtopolnik Can you, please, share a use-case you have in mind. I do want to study those, because I'm also reluctant to add this kind of API to channels. These kinds of performance optimization are better kept hidden, as implementation details, of higher-lever APIs. Let me explicitly mention #254 here. With abstraction for cold stream in place, data flows become declarative -- you declare your data processing pipeline and then launch it, without even having to explicitly use communication primitives like channels, etc. So, if your use-case is to generate a sequence of data items in one thread and consume them in another, then you would be able to use the following hypothetical API do it:
Now, behind the scenes, |
The issue is not the efficiency of inter-thread communication, but what you want to do with the items. Natural batching applies to any situation where you're performing an operation that:
This fits many use cases, basically every example where you deal with a remote system. It could be a database If all I have is I can't build upon |
Ok. Let me rephrase this, again, as a feature request for #254. You want to have an API like this:
|
Yes, that would go a long way. One stumbling point here is the question of the ownership of the batch collection. On the one hand there's the concern of robustness: surrendering the ownership to the consumer function on each invocation is the safest option. On the other hand we have GC concerns: we'd like to reuse the same collection for all invocations of the consumer function. It's hard to get both concerns satisfied in a simple API. Possibly the user could supply his own collection as a first, optional argument to |
@mtopolnik I would rather avoid the issue of ownership in easy-to-use public API (provide a fresh instance every time) to make it less error-prone, but design some lower-lever APIs for advanced users that would allow them to write their own operator implementations that are optimized to the way they see fit. |
Hi @mtopolnik suspend fun <E : Any> Channel<E>.consumeEachBlock(maxBlockSize: Int, consumer: (List<E>) -> Unit) {
consume {
val buffer = ArrayList<E>(maxBlockSize)
while (coroutineContext[Job]?.isActive != false) {
buffer += receiveOrNull() ?: return
while (buffer.size < maxBlockSize) {
val element = poll() ?: break
buffer += element
}
consumer(buffer)
buffer.clear()
}
}
} |
@fvasco This code looks very similar to the code I wrote in the issue description. I can't make out what it improves.
|
In order to make it efficient one indeed needs a built-in operation on channels that "pools" (drains?) multiple elements at once into a user-provided collection. That could help to reduce the amount of internal synchronization. |
I went once again to inspect Agrona's queue, which I consider the canonical implementation of a zero-contention SPSC queue. All operations are done with the The only saving that I see is that the loop in |
@mtopolnik Unfortunately, channels are MPMC so adding "drain" to them would offer only so much benefit. For real scalability we'll need #254 that could use SPSP queues behind the scenes for buffers and switching between the threads. |
As a cross-check, Agrona's |
@mtopolnik Our channel implementation could offer a bit of benefit above calling poll in the loop, but that benefit would be small compared to moving from MPMC to SPSC. |
Sorry, I forget it :-(
See the note here: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/receive.html
I propose to consider also Generally these primitives can be reconsidered implementing |
So basically we are postponing this feature until #254 and then we (probably) will be able to provide a consistent API for both cold and hot streams. We will keep this feature request in mind while designing it, thanks for pointing this out! It is most probably will be an extension in order to help use-cases similar to the original one, where most of the performance win comes from the cutting down the cost of item pre-processing setup.
What is the value of this operator? What problem does it solve to be introduced into the library? |
Reduce synchronization costs (adding a block in an ArrayChannel) |
Do you have a real-world problem where a performance-critical code is dominated by the lock contention in |
None for |
This is what I have come up now using flows:
So it is meant to be used as |
Can we "unpostpone" this feature since #254 is closed? Upd.:
Upd. 2: The only implementation of such batching I know from reactive world is @akarnokd's coalesce |
I think we can manage to implement it as part of #1302. That is, a single |
@circusmagnus We have not yet nailed down design of the API for flow |
Ok. |
@elizarov Here you mentioned
Unless I misunderstand something, natural batching can not be sequential and implies concurrency just like For the sake of discussion in #1302 I would like to ask: Also, I would suggest you to consider natural batching use case for |
@pacher In principle, I agree. When I was looking for this functionality in the existing APIs, one of my first attempts was to see if I could get However, I don’t think your proposal is doable: It’s not possible for From my point of view, a non-sequential
|
Good point about return type @jGleitz, don't know how I missed that. Chunked will have a List in the return type by design. However the question about |
You are right. I have not really thought it through initially. Natural batching does require concurrency.
The name of |
I use this:
Does the job for me |
@schikin As far as I can tell your snippet does simple size-base chunking. The issue is about natural batching which is very different. |
@pacher Got it, just I think people Googling about simple batching using Kotlin Flows (like this https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#buffer-int-) end up in this thread |
Any progress on this? A simple recieveMany(maxCount) would help my case a lot. |
I've used the following for "natural" buffering, works well for my needs: fun <A> Flow<A>.chunked(): Flow<List<A>> = channelFlow {
var batch = mutableListOf<A>()
collectLatest {
batch.add(it)
send(batch)
batch = mutableListOf()
}
}.buffer(Channel.RENDEZVOUS) |
^ Smart one. I do have a more low level solution. Might be a little more efficient, without corotuine cancellation and such: Helper function to drain a channel of all elements, without suspending:
^ It returns a chunk after all elements (which are currently present) are drained from a channel, or we reach max size of a chunk. Another helper to wait for the first element in channel and then drain it:
and finally a flow operator:
|
Check my implementation 😁 (#FlowExt) Sent from my 2201117TG using FastHub |
@hoc081098 See an example snippet from @elizarov for a "reference" 😉 implementation |
Natural aka. smart batching is a technique in stream processing that optimizes throughput without affecting latency. On the example of a concurrent queue, the consumer has the ability to atomically drain all the items observed at some instant and then process them as a batch. Ideally, the queue should be bounded, giving an upper limit to the batch size and providing backpressure to the sender at the same time.
It's called "natural" batching because there's no imposed batch size: when the traffic is low, it will process each item as soon as it arrives. In that case you don't need any throughput optimizations by batching items together. When the traffic gets higher, the consumer will automatically start processing larger batches, amortizing the fixed latency of a single operation like a database
INSERT
.I wrote this sample code that achieves the basic goal:
We can test it with this:
consumeBatched()
polls the queue one item at a time and therefore must additionally impose a batch limit. It would be more optimal if written against a concurrent queue like the Agrona project's OneToOneConcurrentArrayQueue, which supports thedrain
operation.Could support for natural batching be considered as a feature to add?
Comment by @qwwdfsad, taken from Stack Overflow:
It depends on the desired API surface.
drain
member is unlikely to be fit for channel semantics: it constraints implementation, it should somehow expose drain limit and it gives channel more "collection-like" API. E.g. how shoulddrain
behave with an unlimited channel? Is it possible to implementdrain
in an efficient manner (with pre-sized buffer, but avoiding OOMs and unlimited collections) once and use it with any channel implementation?What could be improved is additional hints from the channel such as expected capacity and count of enqueued elements. They can have a relaxed semantics with default implementation and act like hints to
drain
extension with some reasonable configurable upper bounds.The text was updated successfully, but these errors were encountered: