-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Don't allocate threads on every dispatch in Native's thread pools #3595
Conversation
tasksQueue.receive().run() | ||
} else { | ||
availableWorkers.send(privateChannel) | ||
val task = privateChannel.receiveCatching().getOrNull()?.run() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we could quite easily add a withTimeout
here, then getAndUpdate { if still considered to be sleeping, terminate }
, and retry the obtainWorker()
, and this will allow graceful worker termination. I think it would complicate the code somewhat, so didn't initially implement this. @qwwdfsad, do you think this could be worth it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My vote is to have a follow-up PR with the proposed change
@ndkoval, the spirit of the SQS is strong here! :) |
ce6705c
to
cf25934
Compare
cf25934
to
8330a36
Compare
kotlinx-coroutines-core/jvm/test/knit/ClosedAfterGuideTestExecutor.kt
Outdated
Show resolved
Hide resolved
// spin loop until a worker that promised to be here actually arrives. | ||
while (true) { | ||
val result = availableWorkers.tryReceive() | ||
return result.getOrNull() ?: continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit reluctant about having potentially infinite spinloops in case of un unforeseen bug or failure.
It would be nice to replace it with (non-existing) receiveBlocking
which could've been a direct counterpart of sendBlocking
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
@@ -73,38 +74,80 @@ private class MultiWorkerDispatcher( | |||
workersCount: Int | |||
) : CloseableCoroutineDispatcher() { | |||
private val tasksQueue = Channel<Runnable>(Channel.UNLIMITED) | |||
private val availableWorkers = Channel<CancellableContinuation<Runnable>>(Channel.UNLIMITED) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't its capacity be limited with workersCount
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can. Would it improve something? I wanted the code to say "we don't want to ever drop anything here", and "buffer size N" does imply "drop if larger than N."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nothing really, was rather surprised that it has unlimited, while effectively never having more than workersCount
elements/waiters.
we don't want to ever drop anything here
Maybe it's worth adding asserts/postconditions to each trySend
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added some checks. Are they what you meant?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's it, thanks
Seems like we only have two minor questions and then it's good to go |
} | ||
} finally { | ||
dispatcher.close() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test failed in the CI with encounteredWorkers.size == 1
, but I fail to see how it's possible. Can different workers be equal to each other?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like there are at least two data-races:
- One is over
canStart1
/canStart2
- Another one is concurrent mutation of non-thread safe
MutableSet
which probably causes the failure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
non-thread-safe
MultiSet
Wow, what a ridiculous mistake. Looks like I've exhausted my resource for the understanding of concurrency for this PR on the main code.
canStart1
/canStart2
I still fail to see the problem, sorry, could you spell it out?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We all have been there :)
I still fail to see the problem, sorry, could you spell it out?
Non-atomic variable is being read and written concurrently, I honestly not sure what guarantees K/N (or LLVM) provides in such scenarios. In theory, the compiler is allowed to host memory read in while (!canStart*)
from the loop
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. It seems like Kover verification started to fail, probably code coverage dropped somewhere, please take a look before merging
Kover failed because it tried to run all the tests, and |
See also #3576