-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Introduce a separate slot for stealing tasks into in CoroutineSchedul…
…er (#3537) * Introduce a separate slot for stealing tasks into in CoroutineScheduler It solves two problems: * Stealing into exclusively owned local queue does no longer require CAS'es or atomic operations where they were previously not needed. It should save a few cycles on the stealing code path * The overall timing perturbations should be slightly better now: previously it was possible for the stolen task to be immediately got stolen again from the stealer thread because it was actually published to the owner's queue, but its submission time was never updated (#3416) * Move victim argument in WorkQueue into the receiver position to simplify the overall code structure * Fix oversubscription in CoroutineScheduler (-> Dispatchers.Default) (#3418) Previously, a worker thread unconditionally processed tasks from its own local queue, even if tasks were CPU-intensive, but CPU token was not acquired. Fixes #3416 Fixes #3418
- Loading branch information
Showing
5 changed files
with
209 additions
and
70 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
89 changes: 89 additions & 0 deletions
89
kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerOversubscriptionTest.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
/* | ||
* Copyright 2016-2022 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. | ||
*/ | ||
|
||
package kotlinx.coroutines.scheduling | ||
|
||
import kotlinx.coroutines.* | ||
import org.junit.Test | ||
import java.util.concurrent.* | ||
import java.util.concurrent.atomic.AtomicInteger | ||
|
||
class CoroutineSchedulerOversubscriptionTest : TestBase() { | ||
|
||
private val inDefault = AtomicInteger(0) | ||
|
||
private fun CountDownLatch.runAndCheck() { | ||
if (inDefault.incrementAndGet() > CORE_POOL_SIZE) { | ||
error("Oversubscription detected") | ||
} | ||
|
||
await() | ||
inDefault.decrementAndGet() | ||
} | ||
|
||
@Test | ||
fun testOverSubscriptionDeterministic() = runTest { | ||
val barrier = CountDownLatch(1) | ||
val threadsOccupiedBarrier = CyclicBarrier(CORE_POOL_SIZE) | ||
// All threads but one | ||
repeat(CORE_POOL_SIZE - 1) { | ||
launch(Dispatchers.Default) { | ||
threadsOccupiedBarrier.await() | ||
barrier.runAndCheck() | ||
} | ||
} | ||
threadsOccupiedBarrier.await() | ||
withContext(Dispatchers.Default) { | ||
// Put a task in a local queue, it will be stolen | ||
launch(Dispatchers.Default) { | ||
barrier.runAndCheck() | ||
} | ||
// Put one more task to trick the local queue check | ||
launch(Dispatchers.Default) { | ||
barrier.runAndCheck() | ||
} | ||
|
||
withContext(Dispatchers.IO) { | ||
try { | ||
// Release the thread | ||
delay(100) | ||
} finally { | ||
barrier.countDown() | ||
} | ||
} | ||
} | ||
} | ||
|
||
@Test | ||
fun testOverSubscriptionStress() = repeat(1000 * stressTestMultiplierSqrt) { | ||
inDefault.set(0) | ||
runTest { | ||
val barrier = CountDownLatch(1) | ||
val threadsOccupiedBarrier = CyclicBarrier(CORE_POOL_SIZE) | ||
// All threads but one | ||
repeat(CORE_POOL_SIZE - 1) { | ||
launch(Dispatchers.Default) { | ||
threadsOccupiedBarrier.await() | ||
barrier.runAndCheck() | ||
} | ||
} | ||
threadsOccupiedBarrier.await() | ||
withContext(Dispatchers.Default) { | ||
// Put a task in a local queue | ||
launch(Dispatchers.Default) { | ||
barrier.runAndCheck() | ||
} | ||
// Put one more task to trick the local queue check | ||
launch(Dispatchers.Default) { | ||
barrier.runAndCheck() | ||
} | ||
|
||
withContext(Dispatchers.IO) { | ||
yield() | ||
barrier.countDown() | ||
} | ||
} | ||
} | ||
} | ||
} |
Oops, something went wrong.