-
Notifications
You must be signed in to change notification settings - Fork 653
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
FIX-#7346: Handle execution on Dask workers to avoid creating conflic… #7347
Open
data-makerman
wants to merge
7
commits into
modin-project:main
Choose a base branch
from
data-makerman:Fix-#7346--Handle-execution-on-Dask-workers-to-avoid-creating-conflicting-Clients
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 1 commit
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
b5f50b9
FIX-#7346: Handle execution on Dask workers to avoid creating conflic…
data-makerman 3d17773
Add tests for FIX-#7346
data-makerman 6eb5442
Update modin/tests/core/storage_formats/pandas/test_internals.py
data-makerman c6eb900
Fixed linting issues
data-makerman 36e1036
Include fixture in test function for Engine selection
data-makerman 036b91e
Fix linting error due to whitespace
data-makerman 9ff6261
Merge branch 'main' into Fix-#7346--Handle-execution-on-Dask-workers-…
data-makerman File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next
Next commit
FIX-#7346: Handle execution on Dask workers to avoid creating conflic…
…ting Clients Signed-off-by: Michael Akerman <michaeljohnakerman@gmail.com>
- Loading branch information
commit b5f50b9e65b97c197b2923939b154b9b6d09e70b
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder why initialize_dask is called in a worker? Could you verify if this is the case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I definitely don't fully understand it. I can verify that this PR stops the issue and prevents multiple instances of a Dask cluster from starting under any of the operations where I was observing the behavior. Since it occurs in an
apply
context on a remote worker, it was beyond my available time (or existing technical skill) to debug exactly what was happening on the Dask worker. It seems possible that there's some other root cause leading to a call toinitialize_dask
.I can verify by inference that
initialize_dask
is being called inside a worker, because it appears to be the only place in Modin 0.31 where thedistributed.Client
class is ever instantiated, and I can observe in the stdout that multiple Clients are being created as daemonic processes on Dask during theapply
operation demonstrated in #7346, but only when working with Modin (not with the equivalent operation in Pandas).I can hazard a partial guess as to what might be happening that would require further study based on some very confusing behavior I observed: sometimes while attempting to use
client.submit(lambda x: x.apply(foo), pandas_df)
directly on a Pandas dataframe (not Modin), I saw the same error, but only if Modin had been imported usingimport modin.pandas as pd
. It made me wonder if Dask was calling apd
function whilepd
had been masked in the worker's namespace by Modin?I think I can probably create a working example of that if I have enough time later, which might help find the root cause.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I have a decent understanding of what is going on, but there's still something weird happening that I can't explain.
Modin is never fully initialized on the workers. Modin's initialization code is never run on the workers, unless a worker runs a task requires subscribing to an engine it will never have this problem. Link:
modin/modin/core/execution/dispatching/factories/dispatcher.py
Lines 99 to 118 in f5f9ae9
The
Series
constructor ends up in this path viafrom_pandas
. So when you callpd.Series(...)
from within anapply
function, cloudpickle will serialize all of the dependencies of that call and then unpack it within a worker. This could potentially explain why @data-makerman is seeing this happening with pandas after using Modin (but that still seems like a bug to me and is worth investigating more).Now, what I don't understand is that in
ipython
andjupyter
, instead of hitting this problem we have something else entirely, which is that Modin starts a Ray cluster inside a Dask cluster:I don't really even understand why it's different, because if I use
%run issue-7346.py
within ipython I get the same issue as before with Dask:So I think this patch is absolutely correct in detecting that we are in a worker and avoiding initializing a second Dask cluster, but I will follow on with a patch for this weird ipython issue.
Also, the reason this doesn't happen with Ray is because Ray uses a global variable that all workers share to ensure that one and only one Ray cluster are initialized in the same client/worker. We bypass initialization if Ray is initialized here:
modin/modin/core/execution/ray/common/utils.py
Line 96 in f5f9ae9
Ray will always be initialized from within a worker.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This probably has to do with the fact that workers know nothing about Modin configs set in the main process.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@YarShev I think that is correct, but do you know why it would be different in
ipython
vs the python script?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would guess there is a different strategy for importing modules in
ipython
.