Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't allocate threads on every dispatch in Native's thread pools #3595

Merged
merged 7 commits into from
Feb 13, 2023

Conversation

dkhalanskyjb
Copy link
Collaborator

See also #3576

@dkhalanskyjb dkhalanskyjb requested a review from qwwdfsad January 20, 2023 14:21
tasksQueue.receive().run()
} else {
availableWorkers.send(privateChannel)
val task = privateChannel.receiveCatching().getOrNull()?.run()
Copy link
Collaborator Author

@dkhalanskyjb dkhalanskyjb Jan 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could quite easily add a withTimeout here, then getAndUpdate { if still considered to be sleeping, terminate }, and retry the obtainWorker(), and this will allow graceful worker termination. I think it would complicate the code somewhat, so didn't initially implement this. @qwwdfsad, do you think this could be worth it?

Copy link
Collaborator

@qwwdfsad qwwdfsad Feb 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My vote is to have a follow-up PR with the proposed change

@dkhalanskyjb
Copy link
Collaborator Author

@ndkoval, the spirit of the SQS is strong here! :)

@dkhalanskyjb dkhalanskyjb force-pushed the native-dont-spawn-extraneous-workers branch 2 times, most recently from ce6705c to cf25934 Compare January 20, 2023 14:35
@dkhalanskyjb dkhalanskyjb force-pushed the native-dont-spawn-extraneous-workers branch from cf25934 to 8330a36 Compare January 20, 2023 14:41
// spin loop until a worker that promised to be here actually arrives.
while (true) {
val result = availableWorkers.tryReceive()
return result.getOrNull() ?: continue
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit reluctant about having potentially infinite spinloops in case of un unforeseen bug or failure.
It would be nice to replace it with (non-existing) receiveBlocking which could've been a direct counterpart of sendBlocking

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

@@ -73,38 +74,80 @@ private class MultiWorkerDispatcher(
workersCount: Int
) : CloseableCoroutineDispatcher() {
private val tasksQueue = Channel<Runnable>(Channel.UNLIMITED)
private val availableWorkers = Channel<CancellableContinuation<Runnable>>(Channel.UNLIMITED)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't its capacity be limited with workersCount?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can. Would it improve something? I wanted the code to say "we don't want to ever drop anything here", and "buffer size N" does imply "drop if larger than N."

Copy link
Collaborator

@qwwdfsad qwwdfsad Feb 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing really, was rather surprised that it has unlimited, while effectively never having more than workersCount elements/waiters.

we don't want to ever drop anything here

Maybe it's worth adding asserts/postconditions to each trySend?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added some checks. Are they what you meant?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's it, thanks

@qwwdfsad
Copy link
Collaborator

qwwdfsad commented Feb 7, 2023

Seems like we only have two minor questions and then it's good to go

@dkhalanskyjb dkhalanskyjb requested a review from qwwdfsad February 9, 2023 14:43
}
} finally {
dispatcher.close()
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test failed in the CI with encounteredWorkers.size == 1, but I fail to see how it's possible. Can different workers be equal to each other?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like there are at least two data-races:

  • One is over canStart1/canStart2
  • Another one is concurrent mutation of non-thread safe MutableSet which probably causes the failure

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

non-thread-safe MultiSet

Wow, what a ridiculous mistake. Looks like I've exhausted my resource for the understanding of concurrency for this PR on the main code.

canStart1/canStart2

I still fail to see the problem, sorry, could you spell it out?

Copy link
Collaborator

@qwwdfsad qwwdfsad Feb 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We all have been there :)

I still fail to see the problem, sorry, could you spell it out?

Non-atomic variable is being read and written concurrently, I honestly not sure what guarantees K/N (or LLVM) provides in such scenarios. In theory, the compiler is allowed to host memory read in while (!canStart*) from the loop

Copy link
Collaborator

@qwwdfsad qwwdfsad left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. It seems like Kover verification started to fail, probably code coverage dropped somewhere, please take a look before merging

@dkhalanskyjb
Copy link
Collaborator Author

Kover failed because it tried to run all the tests, and testStackMergeWithContext failed. Locally, ./gradlew koverVerify koverReport -Pkover.enabled=true passes without issues.

@qwwdfsad qwwdfsad merged commit e946cd7 into develop Feb 13, 2023
@qwwdfsad qwwdfsad deleted the native-dont-spawn-extraneous-workers branch February 13, 2023 09:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants