-
Notifications
You must be signed in to change notification settings - Fork 531
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix for zero length collective IO (issue #965) #1206
base: master
Are you sure you want to change the base?
Conversation
Collective IO where one rank attempts to read or write a zero length slice causes the a hang in an underlying collective call. This fix ensures that all ranks participate in any collective IO operation even if some operations are on zero length slices. This also includes some basic unit testing for collective IO that tests for this problem.
Codecov Report
@@ Coverage Diff @@
## master #1206 +/- ##
==========================================
- Coverage 83.79% 83.67% -0.12%
==========================================
Files 18 18
Lines 2160 2169 +9
==========================================
+ Hits 1810 1815 +5
- Misses 350 354 +4
Continue to review full report at Codecov.
|
This looks OK to me, but as I don't use MPI, I'd like someone who does to review it. Notes:
|
Thanks @takluyver. That all makes sense. I will add some more inline comments to explain exactly why this needs to be done, and I'll update the parameter names. As for why this was being done in the first place, I'm not sure. Presumably as a performance optimisation, although whether it's a worthwhile one I don't know. For this PR I wanted to keep it minimal, it seemed like it could be a subtly breaking change for a much larger pool of users to now start doing the HDF5 calls for empty selections. Finally on the CI server front, I don't see why it couldn't be run the CI server. You just need to call your test runner for that test only with |
The CI is a bit complex. It's all controlled through tox, which is configured by the Then the Travis and Appveyor config files largely just specify which tox environment to run in each job (plus some other bits and pieces to ensure it can build against HDF5 properly. There's an Alternatively, if there are only going to be a few tests that need this, it might be simpler for the test code itself to start mpirun as a subprocess, run a script, and assert on the expected results from the script. This could also make it practical to turn those hangs into failures, by imposing a (generous) time-limit on waiting for the subprocess. |
h5py/_hl/dataset.py
Outdated
@@ -552,7 +552,14 @@ def __getitem__(self, args): | |||
# Perform the dataspace selection. | |||
selection = sel.select(self.shape, args, dsid=self.id) | |||
|
|||
if selection.nselect == 0: | |||
# If we are running in MPI mode we need to check if we are set in | |||
# collective IO mode to ensure we actually perform all IO modes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the comment could use a bit of extra explanation, something like:
...so that it doesn't get stuck waiting for an I/O operation that some workers have skipped
(IDK if 'workers' is the standard term in MPI, but I trust you'll know this)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Worker would make sense, but I think "MPI process" is most clear, so I'll use that.
h5py/_hl/selections.py
Outdated
""" Get an iterable for broadcasting. | ||
|
||
Setting `collective` includes zero length selections that are | ||
required for collective operations. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, looking at it some more, does the selections code need to know about collective mode at all? Shouldn't it handle empty selections correctly regardless?
I appreciate the desire to not change anything for normal I/O. But if we can avoid special-casing it as far as possible, the code is cleaner and better tested.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that it would be best to avoid special cases. If you're amenable to that I'll try and go that route. Hopefully the tests will illuminate any unintended problems.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I see. The obvious thing for a generator over an empty selection to do is to yield nothing. But that won't work in collective mode.
What happens if the different workers get different (non-zero) numbers of chunks from .broadcast()
? Does that cause a hang because they're doing different numbers of HDF5 writes?
I feel like I'm missing something here. It shouldn't need this much code to be aware that it's doing something collectively. Maybe the answer is just to block broadcasting for collective I/O, so you have to set up the right shape of data yourself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if the different workers get different (non-zero) numbers of chunks from
.broadcast()
? Does that cause a hang because they're doing different numbers of HDF5 writes?
That is exactly what happens.
However, empirically, just changing the clause in the .broadcast()
from
- if nchunks == 1 or (nchunks == 0 and collective):
+ if nchunks < 2:
does actually work fine. I also think it probably doesn't have any odd side effects because if nchunks
was zero, then the nselect == 0
check probably means that normal IO calls don't get there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@takluyver I'm starting to agree with your suggestion that we should forbid broadcasting in collective mode, or at least strongly restrict it. In theory you might want to do something like this:
arr = np.ones(10)
dset = fh.create_dataset('test', shape=(3, 10))
with dset.collective:
if comm.rank == 0:
dset[:2, :10] = arr # 2 chunks, yields and writes twice on MPI rank=0
else:
dset[2:, :10] = arr # 1 chunk, yields and writes once on MPI rank=1
which even with either of the changes I've tried would hang, because there's fundamentally a different number of chunks needed to be written by each. I think there's two options here:
- Disallow broadcasting when doing MPI collective writes.
- Before writing, communicate with all other ranks to determine how many chunks will be written, and then insert empty writes to keep the number the same across all ranks.
The latter seems like a bit of a hack.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if one MPI process throws an error? Because if we tried to disallow broadcasting, at least in the obvious way, I suspect rank 0 in your example would throw an error, while rank 1 would do its 1 chunk write and then wait for a matching write in rank 0 which isn't going to happen.
This is ugly. I'm trying to think about how best to deal with it, but I'm not sure I know enough about MPI.
I also notice that we should update the docs, because the section on collective I/O doesn't even mention with with dset.collective
context manager:
http://docs.h5py.org/en/stable/mpi.html#collective-versus-independent-operations
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the way I was thinking of implementing this, I don't think it would break. You probably want to insist that each ranks file and memory shapes match exactly. So rank=1 wouldn't work here because it tries to write a shape=(10,) array into a shape=(2,10) selection.
However, despite that I'd probably have each rank do it's preparation for writing, and then just before writing have all ranks communicate if they have any errors and have then all throw an exception about the need to broadcast. Something like:
local_mpi_broadcast_error = (memshape != selshape)
# Reduce to see if *any* rank had an error
global_mpi_bcast_error = self.comm.allreduce(local_mpi_broadcast_error, op=MPI.MAX)
if local_mpi_broadcast_error:
raise ValueError("Cannot broadcast in MPI collective mode (mem shape=%s, file selection=%s) % ....)
if global_broadcast_error:
raise ValueError("Yikes. Another rank tried to broadcast during a write.")
# ... actually do the write now
A bit like that anyway. Let me change it again and update the PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking about it more, I think we can do it well enough with a simple check, without exchanging messages, which means less to go wrong. It will always be possible to create a hang if you branch on if comm.rank ...
, but we want to make it obvious that things like this are a problem:
dset = fh.create_dataset('test', shape=(10,))
chunk_start = int(comm.rank / comm.size)
chunk_end = int((comm.rank + 1) / comm.size)
with dset.collective:
dset[chunk_start:chunk_end] = comm.rank
At the moment, that will work (inefficiently: #1067) for 4 workers, but fail for 8, because some workers will write 12 values and some 13. But if we forced people to do the broadcasting with numpy beforehand, it would work:
chunk = np.full(chunk_end - chunk_start, comm.rank)
with dset.collective:
dset[chunk_start:chunk_end] = comm.rank
I think it needs to be a fairly strict check: not just that broadcast would only return one chunk, but that the dimensions of the data match those of the selection, even when those are length-1 dimensions.
And drop collective parameter from selection module
I've had a go at implementing what I was thinking of. We'll still need to work out testing before this can be merged, though. |
I had a look at this (in terms of testing mpi), and I'm not sure we're actually testing the codebase under MPI. I've kinda got something working (I'll push a PR once I've sorted out a few things). @jrs65 Did you have the |
Is this not testing with mpi? |
I think it only tests building with MPI enabled; when I looked before I didn't find anything using mpirun in the tests. |
At best, travis is currently running a MPI build of HDF5 with a MPI build of h5py (HDF5_MPI isn't passed through by tox currently, so it's possible using CC=mpicc is enough). I'm not sure which tests are being skipped though, but I've nearly got a PR ready which will show which tests are being skipped (passing -rsxX to pytest), and runs the tests under mpirun (which showed that our MPI tests don't work under mpirun with more than one process). I'll try to finish the PR tonight. |
Thanks @aragilar! |
I think this needs rebasing to test with the latest MPI support (not sure if there's anything else holding this up)? |
Collective IO where one rank attempts to read or write a zero length slice causes the a hang in an underlying collective call (issue #965). This bit me recently, so I thought I'd attempt to fix it. This PR ensures that all ranks participate in any collective IO operation even if some operations are on zero length slices. This also includes some basic unit testing for collective IO that tests for this problem.
I've attempted to limits the effects of this fix to only happen when MPI collective mode is enabled, to do this I needed to make a slight change to the API of
Selection.broadcast
to add a parameter which tells it to return zero length selections when in collective mode.Anyway, please let me know what other work this needs. Thanks!