forked from Kotlin/kotlinx.coroutines
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Introduce CoroutineDispatcher.limitedParallelism and make Dispatchers…
….IO unbounded for limited parallelism (Kotlin#2918) * Introduce CoroutineDispatcher.limitedParallelism for granular concurrency control * Elastic Dispatchers.IO: * Extract Ktor-obsolete API to a separate file for backwards compatibility * Make Dispatchers.IO being a slice of unlimited blocking scheduler * Make Dispatchers.IO.limitParallelism take slices from the same internal scheduler Fixes Kotlin#2943 Fixes Kotlin#2919
- Loading branch information
1 parent
ee360ed
commit 45da790
Showing
29 changed files
with
755 additions
and
347 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
105 changes: 105 additions & 0 deletions
105
kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
/* | ||
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. | ||
*/ | ||
|
||
package kotlinx.coroutines.internal | ||
|
||
import kotlinx.coroutines.* | ||
import kotlin.coroutines.* | ||
import kotlin.jvm.* | ||
|
||
/** | ||
* The result of .limitedParallelism(x) call, a dispatcher | ||
* that wraps the given dispatcher, but limits the parallelism level, while | ||
* trying to emulate fairness. | ||
*/ | ||
internal class LimitedDispatcher( | ||
private val dispatcher: CoroutineDispatcher, | ||
private val parallelism: Int | ||
) : CoroutineDispatcher(), Runnable, Delay by (dispatcher as? Delay ?: DefaultDelay) { | ||
|
||
@Volatile | ||
private var runningWorkers = 0 | ||
|
||
private val queue = LockFreeTaskQueue<Runnable>(singleConsumer = false) | ||
|
||
@ExperimentalCoroutinesApi | ||
override fun limitedParallelism(parallelism: Int): CoroutineDispatcher { | ||
parallelism.checkParallelism() | ||
if (parallelism >= this.parallelism) return this | ||
return super.limitedParallelism(parallelism) | ||
} | ||
|
||
override fun run() { | ||
var fairnessCounter = 0 | ||
while (true) { | ||
val task = queue.removeFirstOrNull() | ||
if (task != null) { | ||
try { | ||
task.run() | ||
} catch (e: Throwable) { | ||
handleCoroutineException(EmptyCoroutineContext, e) | ||
} | ||
// 16 is our out-of-thin-air constant to emulate fairness. Used in JS dispatchers as well | ||
if (++fairnessCounter >= 16 && dispatcher.isDispatchNeeded(this)) { | ||
// Do "yield" to let other views to execute their runnable as well | ||
// Note that we do not decrement 'runningWorkers' as we still committed to do our part of work | ||
dispatcher.dispatch(this, this) | ||
return | ||
} | ||
continue | ||
} | ||
|
||
@Suppress("CAST_NEVER_SUCCEEDS") | ||
synchronized(this as SynchronizedObject) { | ||
--runningWorkers | ||
if (queue.size == 0) return | ||
++runningWorkers | ||
fairnessCounter = 0 | ||
} | ||
} | ||
} | ||
|
||
override fun dispatch(context: CoroutineContext, block: Runnable) { | ||
dispatchInternal(block) { | ||
dispatcher.dispatch(this, this) | ||
} | ||
} | ||
|
||
@InternalCoroutinesApi | ||
override fun dispatchYield(context: CoroutineContext, block: Runnable) { | ||
dispatchInternal(block) { | ||
dispatcher.dispatchYield(this, this) | ||
} | ||
} | ||
|
||
private inline fun dispatchInternal(block: Runnable, dispatch: () -> Unit) { | ||
// Add task to queue so running workers will be able to see that | ||
if (addAndTryDispatching(block)) return | ||
/* | ||
* Protect against the race when the number of workers is enough, | ||
* but one (because of synchronized serialization) attempts to complete, | ||
* and we just observed the number of running workers smaller than the actual | ||
* number (hit right between `--runningWorkers` and `++runningWorkers` in `run()`) | ||
*/ | ||
if (!tryAllocateWorker()) return | ||
dispatch() | ||
} | ||
|
||
private fun tryAllocateWorker(): Boolean { | ||
@Suppress("CAST_NEVER_SUCCEEDS") | ||
synchronized(this as SynchronizedObject) { | ||
if (runningWorkers >= parallelism) return false | ||
++runningWorkers | ||
return true | ||
} | ||
} | ||
|
||
private fun addAndTryDispatching(block: Runnable): Boolean { | ||
queue.addLast(block) | ||
return runningWorkers >= parallelism | ||
} | ||
} | ||
|
||
// Save a few bytecode ops | ||
internal fun Int.checkParallelism() = require(this >= 1) { "Expected positive parallelism level, but got $this" } |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.