-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce CoroutineDispatcher.limitedParallelism
as an alternative to standalone executors and slices
#2919
Comments
CoroutineDispatcher.limitedParallelism
as an alternative to standalone executors and slices
Very nice API, I like the interesting capabilities it can give with such simplicity. |
…ate API * Mention CoroutineDispatcher.limitedParallelism as an intended replacement * Prepare the API to sharing with K/N new memory model where we _have_ to have this API Addresses #2919
The complementary proposal: #2943 |
This proposal is a great improvement of |
…ate API (Kotlin#2922) * Mention CoroutineDispatcher.limitedParallelism as an intended replacement * Prepare the API to sharing with K/N new memory model where we _have_ to have this API Addresses Kotlin#2919
….IO unbounded for limited parallelism (Kotlin#2918) * Introduce CoroutineDispatcher.limitedParallelism for granular concurrency control * Elastic Dispatchers.IO: * Extract Ktor-obsolete API to a separate file for backwards compatibility * Make Dispatchers.IO being a slice of unlimited blocking scheduler * Make Dispatchers.IO.limitParallelism take slices from the same internal scheduler Fixes Kotlin#2943 Fixes Kotlin#2919
I can't wait for this, yes please! Meanwhile, I'm using https://stackoverflow.com/questions/47686353/how-to-cap-kotlin-coroutines-maximum-concurrency/65046522#65046522 |
@qwwdfsad Hello, I've found some inconsistency with Example: suspend fun main() {
runBlocking {
println("Start")
testDispatchers()
println("finish")
}
}
suspend fun testDispatchers() = coroutineScope {
val dispatcher = Dispatchers.Default
println("Test with limitedParallelism:")
// This one works in a not-expected way!!!
runTestWithDispatcher(dispatcher, dispatcher.limitedParallelism(1))
println("\nTest with newSingleThreadContext:")
// This one works OK.
runTestWithDispatcher(dispatcher, newSingleThreadContext("SingleThread"))
println("\nTest with newSingleThreadExecutor:")
// This one works OK, but need to shutdown explicitly.
val singleExecutorDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
runTestWithDispatcher(dispatcher, singleExecutorDispatcher)
// We need explicitly shutdown ExecutorService to finish a root coroutine.
(singleExecutorDispatcher.executor as? ExecutorService)?.shutdown()
}
suspend fun runTestWithDispatcher(dispatcher: CoroutineDispatcher, limitedDispatcher: CoroutineDispatcher) = coroutineScope {
buildList {
this += launch(dispatcher) {
println("1 final job with thread '${Thread.currentThread().name}'")
Thread.sleep(100)
}
this += launch(limitedDispatcher) {
println("2 final job (limited) with thread '${Thread.currentThread().name}'")
Thread.sleep(100)
}
}.joinAll()
buildList {
this += launch(dispatcher) {
println("3 final job with thread '${Thread.currentThread().name}'")
Thread.sleep(100)
}
this += launch(limitedDispatcher) {
println("4 final job (limited) with thread '${Thread.currentThread().name}'")
Thread.sleep(100)
}
}.joinAll()
} Output:
This issue doesn't exist if I remove Is it a bug? |
@ultraon this is expected behaviour. |
@qwwdfsad It is so interesting. And if I need to run coroutines in a dedicated thread, what is the preferred approach (taking into account that |
…ate API (Kotlin#2922) * Mention CoroutineDispatcher.limitedParallelism as an intended replacement * Prepare the API to sharing with K/N new memory model where we _have_ to have this API Addresses Kotlin#2919
….IO unbounded for limited parallelism (Kotlin#2918) * Introduce CoroutineDispatcher.limitedParallelism for granular concurrency control * Elastic Dispatchers.IO: * Extract Ktor-obsolete API to a separate file for backwards compatibility * Make Dispatchers.IO being a slice of unlimited blocking scheduler * Make Dispatchers.IO.limitParallelism take slices from the same internal scheduler Fixes Kotlin#2943 Fixes Kotlin#2919
See the original idea behind slices here: #261
Since then, a lot of API was introduced and refined, so the proposed API no longer fits
kotlinx.coroutines
.Use-cases
newFixedThreadPoolContext
andnewSingleThreadContext
, as well as regular Java executors, are often used as the only way to control concurrency. Various factors contribute to that -- limited external resources (e.g. pool of database connections), functional restrictions (e.g. CPU consumption or rate-limiting), or need to have an isolated serializable execution flow (an extreme example would be IDEA code-base that contains 62 single-threaded dispatchers).As a result, an average application creates unnecessary many threads, most of which are idle, consuming the memory, CPU, and device battery.
Design overview
To address such use-cases, we aim to provide API to take a view of the dispatcher. The dispatcher's view does not create any additional threads and does not allocate any resources. Instead, it piggybacks the original dispatcher but limits the parallelism of submitted tasks and coroutines. Views themselves are independent and it is possible to have views that have more declared parallelism in total than the original dispatchers. It saves users from meticulous counting of resources, especially when views are used along with global
Dispatchers.Default
andDispatchers.IO
.With that API, it is possible to have fine-grained control over the parallelism and leverage already existing dispatchers and executors, saving resources and the number of threads.
The API changes are minimal, i.e. instead of
it is possible to write this instead:
API changes
Conceptually, any coroutine dispatcher can limit its own parallelism. If we are allowing the sum of views to be greater than the original dispatcher, nothing prevents users to take a view of a single-threaded dispatcher (e.g.
Dispatchers.Main
) as long as its contract allows it to short-circuit the implementation toreturn this
.As a result, a single public signature will be added to
CoroutineDispatcher
classwith the default implementation provided.
Rejected alternative namings
While being short and visually attractive, it's easy to misinterpret their semantics.
slice
,subset
, andview
it can be reasonable to assume that the sum of all the slices/subsets/views cannot be greater than the originaldedicated
it can be reasonable to assume that dedicated threads are used for such dispatcherlimited
is its name. We cannot enforce the usage of named parameters on the language level, and seeingmyDispatcher.limited(2)
ormyDispatcher.limited(myLimit)
in the wild may be quite misleading.The text was updated successfully, but these errors were encountered: