-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ROI] Memory and running time for ROI-based illumination correction #27
Comments
A rapid test suggests this scheme is useless. The next trivial check is to play with rechunking. The first attempt (fractal-analytics-platform/fractal-client@2ee3e22) was to use The result is a small reduction of memory (new task is the green line, in the figure below), but this doesn't really make a real difference. Other options we quickly mentioned with @jluethi:
|
* Remove split_3D_indices_into_z_layers; * Remove temporary_test; * Update test.
Here is the result of a comprehensive set of local tests, which only mimic the illumination-correction task. It would be more useful to translate it into more realistic tests, but the cluster is not available at the moment. TL;DRFor this artificial example:
This is encouraging, but we need to switch to a more realistic test. Setup
def function(dummy):
return dummy.copy() + 1
def doall(shape_label, chunk_label, inline_array, compute):
in_zarr = f"../raw_{shape_label}_{chunk_label}.zarr"
out_zarr = f"out_{shape_label}_{chunk_label}"
out_zarr += f"_inline_{inline_array}_compute_{compute}.zarr"
x = da.from_zarr(in_zarr, inline_array=inline_array)
y = da.empty(x.shape, chunks=x.chunks, dtype=x.dtype)
if os.path.isdir(out_zarr):
shutil.rmtree(out_zarr)
print(out_zarr)
CS = 2000
delayed_function = dask.delayed(function)
print("shape:", x.shape)
print("chunks:", x.chunks)
print("ROI edge:", CS)
t0 = time.perf_counter()
num_ind = 0
for ind in product(list(range(0, x.shape[-2] - 1, CS)),
list(range(0, x.shape[-1] - 1, CS))):
for i_ch in range(x.shape[0]):
sy, sx = ind[:]
ex = sx + CS
ey = sy + CS
for i_z in range(x.shape[1]):
lazy_res = delayed_function(x[i_ch, i_z, sy:ey, sx:ex])
y[i_ch, i_z, sy:ey, sx:ex] = da.from_delayed(lazy_res,
shape=(CS, CS),
dtype=np.uint16)
num_ind += 1
t1 = time.perf_counter()
print(f"End of loop over {num_ind} indices, elapsed {t1-t0:.3f} s")
if compute is None:
y.to_zarr(out_zarr, dimension_separator="/")
elif compute:
y.to_zarr(out_zarr, dimension_separator="/", compute=True)
size_MB = get_dir_size(out_zarr) / 1e6
t1 = time.perf_counter()
print(f"End, elapsed {t1-t0:.3f} s")
print(f"Folder size: {size_MB:.1f} MB")
print()
assert size_MB > 1
shutil.rmtree(out_zarr) TestsWe perform two kinds of tests:
I'll describe the results of the two tests in two follow-up comments - stay tuned.. |
Test 1: dask graphs and orderingWe generated and looked at the graphs for the 8 combinations corresponding to the small array. The interesting ones are those were the ROIs and chunks are not aligned, and for simplicity we only show those without the
Take-home message: when chunks and ROIs are not aligned, inlining the Broader context: it is unclear whether this test can help us, because in principle illumination correction works in a situation where ROIs are aligned with chunks. Still, this is something to keep in mind. |
Test 2: memory profiling on large arraysFor a zarr array which takes 11 GB on disk (independently on the chunking choice), we run the test above 8 times, and we profile it with lines like from memory_profiler import memory_usage
mem = memory_usage((doall, (shape_label, chunk_label, inline_array, compute)), interval=0.1) The output is shown below. We observe that 6 cases out of 8 can be considered memory-safe (11 GB are written on disk, with a memory peak of <2.5 GB). The only two "bad" cases are those where ROIs and chunks are not aligned, and where Once again: if we are doing everything correctly, the illumination-correction task has the ROIs aligned with chunks, |
Thanks @tcompa ! Very insightful and good visualization on when dask ordering can matter. Interesting to see how much of a difference "better" ordering with I'm also very curious to see profiling of the illumination correction function with these learnings and how it differs from those tests. The dask graph in #26 already looks fairly structured (with default |
Have we measured how large your large array is vs. how large the real data for illum_corr is if it's all loaded to a numpy array? I wonder whether the real illum_corr data (3 channels instead of 2, more planes right, 19 instead of 10?) may not just be larger, e.g. the 12 GB bump there is the 2.5 GB bump here for most cases? |
And have you looked into restricting the opportunistic caching explicitly? See here: https://docs.dask.org/en/stable/caching.html |
We have now tested two sets of artificial examples on the cluster:
This means: let's forget the illumination-correction task, dask handling of this graph changes qualitatively when going from Is it because of dask being unable to optimize a large graph? Vaguely related reference: https://blog.dask.org/2018/06/26/dask-scaling-limits. PS it's useful to do this artificial test, cause in the illumination-correction case the compression of data introduces a bit more complexity.
We used this feature extensively in the past, for testing, but never found a case where it really mattered. We may try again on this one. EDIT: fixed wrong shapes |
Very good that we have a synthetic example then that reproduces this issue! :) Maybe the synthetic issue is also something we could report to dask to see if anyone has inputs there? It's certainly surprising. Just for my understanding: What happens with dask arrays of
Though we are far away from the 100s of GB or TBs that they describe as typical sizes in that post... |
My bad, these are actually the shapes we are looking at:
|
Ok, great. Then we have a synthetic example that reproduces the issue and fits in data size to our actual data! (without dealing with compression, actual function calls for illumcorr etc) |
For this data,
and with parameters {ROI/chunk aligned, inline_array=True, compute=True}, we played a bit with the cache. First tests:
|
Ok, I guess we can rule out the opportunistic caching. Thanks for testing! :) |
Here's the draft for a more self-contained discussion (which we readily turn into a dask issue). Brief descriptionWe process a 4-dimensional array by applying a simple (delayed) function to several parts of it, and then write it to disk with Detailed steps to reproduceWhat follows is with python 3.8.13 and dask 2022.7.0. We first prepare four arrays of different size, and we store them on disk as zarr files: import numpy as np
import dask.array as da
shapes = [
(2, 2, 16000, 16000),
(2, 4, 16000, 16000),
(2, 8, 16000, 16000),
(2, 16, 16000, 16000),
]
for shape in shapes:
shape_id = f"{shape}".replace(",", "").replace(" ", "_")[1:-1]
x = da.random.randint(0, 2 ** 16 - 1,
shape,
chunks=(1, 1, 2000, 2000),
dtype=np.uint16)
x.to_zarr(f"data_{shape_id}.zarr") The on-disk sizes scale as expected with the array shape (i.e. they increase by a factor of two from array to the next one):
For context: in our actual use case, these are 4-dimensional arrays (with indices which are named "channel, z, y, x") that store a bunch of 2-dimensional images coming from an optical microscope. After preparing these arrays, we process them with import sys
import numpy as np
import shutil
import dask.array as da
import dask
from memory_profiler import memory_usage
SIZE = 2000
# Function which shifts a SIZExSIZE image by one
def shift_img(img):
assert img.shape == (SIZE, SIZE)
return img.copy() + 1
delayed_shift_img = dask.delayed(shift_img)
def process_zarr(input_zarr):
out_zarr = f"out_{input_zarr}"
data_old = da.from_zarr(input_zarr)
data_new = da.empty(data_old.shape,
chunks=data_old.chunks,
dtype=data_old.dtype)
n_c, n_z, n_y, n_x = data_old.shape[:]
print(f"Input file: {input_zarr}")
print(f"Output file: {out_zarr}")
print("Array shape:", data_old.shape)
print("Array chunks:", data_old.chunks)
print(f"Image size: ({SIZE},{SIZE})")
# Loop over all images in the array
for i_x in range(0, n_x - 1, SIZE):
for i_y in range(0, n_y - 1, SIZE):
for i_c in range(n_c):
for i_z in range(n_z):
new_img = delayed_shift_img(data_old[i_c, i_z, i_y:i_y+SIZE, i_x:i_x+SIZE])
data_new[i_c, i_z, i_y:i_y+SIZE, i_x:i_x+SIZE] = da.from_delayed(new_img,
shape=(SIZE, SIZE),
dtype=data_old.dtype)
# Write data_new to disk (triggers execution)
data_new.to_zarr(out_zarr)
# Clean up output folder
shutil.rmtree(out_zarr)
if __name__ == "__main__":
input_zarr = sys.argv[1]
interval = 0.1
mem = memory_usage((process_zarr, (input_zarr,)), interval=interval)
time = np.arange(len(mem)) * interval
mem_file = "log_memory_" + input_zarr.split(".zarr")[0] + ".dat"
print(mem_file)
np.savetxt(mem_file, np.array((time, mem)).T) where we are applying the function Unexpected resultsWe expected a linear scaling of the memory footprint with the array size. Instead, we observe the following peak memory:
Apart from the first two (smallest) arrays, where we are probably looking at some fixed-size overhead, we expected a more linear growth of memory, while the two-fold array-size increase from The more detailed memory trace looks like As obtained with import numpy as np
import matplotlib.pyplot as plt
for n_z in [2, 4, 8, 16]:
f = f"log_memory_data_2_{n_z}_16000_16000.dat"
t, mem = np.loadtxt(f, unpack=True)
plt.plot(t, mem, label=f"shape=(2,{n_z},16k,16k)")
print(f, np.max(mem))
plt.xlabel("Time (s)")
plt.ylabel("Memory (MB)")
plt.legend(fontsize=8, framealpha=1)
plt.grid()
plt.savefig("fig_memory.png", dpi=256) |
I did a bit more reading before posting the above issue on dask and hit this issue: dask/distributed#5960 It's a mix of discussions about using LocalCluster, logging and memory management in general. Apparently, there were fixes to some of the memory handling issues though, so I thought it's worthwhile to test our problem with different versions of dask. The results were somewhat surprising. Using the 2022.07.0 release like above, I reproduce the results you hit @tcompa Comparing this to the 2022.01.1 release that was a workaround in the dask issue above, it doesn't seem to change anything: Thus, likely this issue wasn't actually related to the same underlying memory management problem. But I still tested it with the current dask version (2022.08.0). There were two main differences:
This happens reproducibly. @tcompa We had issues in the past where CPU usage was low for a while and then suddenly it would increase to process things. Do you remember what the issue was there? Because maybe that issue now hits us again here? Maybe that helps us trace why things go so much slower here? Also, another observation with those slow runs: Nothing is written to disk until right before the end => it's not really finishing the writing to disk process. I assume that's why the memory load increases continuously. |
The mystery continues. I wrote a mapblocks version of our synthetic example and compared the performance to the 2022.07.0 dask indexing performance above.
When running on the largest synthetic dataset, mapblocks outperforms the indexing approach in both memory and running time significantly (blue: indexing example from above. Orange: mapblocks performance). The mapblocks currently runs on the whole 4D array at a time though (but still with the same chunks?), so I'm not sure yet whether it will be slower if we apply it per channel & concatenate again / if we run it per z plane explicitly. Will test later. One very interesting observation here: The mapblocks implementation looks like it continuously writes to the zarr file, not just at the end when everything is computed. Thus, that would explain why it needs less memory, as it writes piece-by-piece to disk already, thus freeing up memory. Conclusions:
|
I couldn't figure out why the indexing approach does not seem to continuously write to the Zarr file during processing. I now created a topic on this on the dask forum, let's hope we get some feedback there: https://dask.discourse.group/t/using-da-delayed-for-zarr-processing-memory-overhead-how-to-do-it-better/1007 |
And I reported the differences in runtime for the indexing approach between |
I think our typical experience with low-CPU usage is related to IO bottlenecks (as in fractal-analytics-platform/fractal-client#57), where writing a large number of small files is a slow process because of disk performances and then it doesn't take up large CPU resources. |
Thanks @tcompa |
I think I have the solution to our ROI memory issue! If we use the Orange: Old I'll report further details and example code in the coming days, just quickly wanted to share the success 🥇 |
I reported a detailed summary and evaluation here: https://dask.discourse.group/t/using-da-delayed-for-zarr-processing-memory-overhead-how-to-do-it-better/1007/11 For reference, this is example code that now works very well for me and produces the results above: Codedef process_zarr_regions(input_zarr, inplace=False, overwrite=True):
if inplace and not overwrite:
raise Exception('If zarr is processed in place, overwrite needs to be True')
out_zarr = f"out_{input_zarr}"
data_old = da.from_zarr(input_zarr)
# Prepare output zarr file
if inplace:
new_zarr = zarr.open(input_zarr)
else:
new_zarr = zarr.create(
shape=data_old.shape,
chunks=data_old.chunksize,
dtype=data_old.dtype,
store=da.core.get_mapper(out_zarr),
overwrite=overwrite,
)
n_c, n_z, n_y, n_x = data_old.shape[:]
print(f"Input file: {input_zarr}")
print(f"Output file: {out_zarr}")
print("Array shape:", data_old.shape)
print("Array chunks:", data_old.chunks)
print(f"Image size: ({SIZE},{SIZE})")
tasks = []
regions = []
for i_c in range(n_c):
for i_z in range(n_z):
for i_y in range(0, n_y - 1, SIZE):
for i_x in range(0, n_x - 1, SIZE):
regions.append((slice(i_c, i_c+1), slice(i_z, i_z+1), slice(i_y,i_y+SIZE), slice(i_x,i_x+SIZE)))
for region in regions:
data_new = shift_img(data_old[region])
task = data_new.to_zarr(url=new_zarr, region=region, compute=False, overwrite=overwrite)
tasks.append(task)
# Compute tasks sequentially
# TODO: Figure out how to run tasks in parallel where save => batching
# (where they don't read/write from/to the same chunk)
for task in tasks:
task.compute() I think important next steps for Fractal will be:
Also, one learning for the illumination correction: Overhead seems to scale with number of ROIs. Thus, for the illumination correction, I think it will be most efficient to slightly rewrite the correction function. It should also be able to handle a 3D array and just process it plane by plane. That way, we will save a lot on the overhead :) (we should maintain the ability to run on 2D planes only, in case a user provides 2D data or applies illumination correction after an MIP) |
This looks promising, thank you! I'll look into this and come back with more detailed comments/updates. |
Let's implement the sequential running of ROIs for this issue. I moved the discussion of parallelization to this discussion issue if we want to discuss it further in the future: #44 |
We already know that ROI-parallel tasks have a computational overhead over chunk-parallel tasks. Here we report more in detail how things look like for the illumination correction task (which is also somewhat special -- see below).
TL;DR
Details
We compare the current version of the task (31fe8533e00159d42691fd1207d3174eb5d941d5) and an old chunk-parallel one (c653138cb1a37fb8e1348fdf99fa82b48a4fc987). As a test, we use a single well with 9x8 FOVs and 19 Z planes.
After some tinkering, we find that the optimal (speed-wise and memory-wise) version of the per-ROI task includes these loops:
Notice that we do not call the function
split_3D_indices_into_z_layers
(which would produce multiple ROIs for each FOV, distributed over all Z planes) any more, but we rather iterate "by hand" over the Z planes (for ind_z in range(e_z):
), and then combine thee_z=19
single-Z-plane FOVs into a single stack (data_zyx_new[s_z:e_z, s_y:e_y, s_x:e_x] = da.stack(tmp_zyx, axis=0)
). This is clearly better (in terms of both memory and speed) than an older version, see https://github.com/fractal-analytics-platform/fractal/blob/d80e83253df4d2ec6ffbadbdf52b281b3de30217/fractal/tasks/illumination_correction.py#L228-L254Results
Here is the memory trace of the two (old/new) tasks.
Detailed comments:
yokogawa_to_zarr
have a similar structure (they work image-by-image within for loops) but no memory issue (however: they are mostly based on the append/stack procedure, which is clearly better than using explicit indices).Higher-level comments:
map_blocks
parallelization, but also the append/stack/concatenate statements are recommended) has serious consequences. In the specific case of illumination correction, there is no advantage in this change, because we always work on images and we always work at level 0. The advantages should be evident in other tasks, where (1) we have more complex (or at least 3D) ROIs, or (2) we need to work at lower resolution.The text was updated successfully, but these errors were encountered: